feat(*): OpenTelemetry traces

Fixes #6149
This commit is contained in:
Loïc Mathieu
2024-10-01 17:37:00 +02:00
parent 33308c49c8
commit e87b97a2e1
28 changed files with 903 additions and 297 deletions

View File

@@ -123,6 +123,7 @@ allprojects {
micronaut "io.micronaut.micrometer:micronaut-micrometer-registry-prometheus"
micronaut "io.micronaut:micronaut-http-client"
micronaut "io.micronaut.reactor:micronaut-reactor-http-client"
micronaut "io.micronaut.tracing:micronaut-tracing-opentelemetry-http"
// logs
implementation "org.slf4j:slf4j-api"
@@ -133,6 +134,9 @@ allprojects {
implementation group: 'org.slf4j', name: 'jcl-over-slf4j'
implementation group: 'org.fusesource.jansi', name: 'jansi'
// OTEL
implementation "io.opentelemetry:opentelemetry-exporter-otlp"
// jackson
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'

View File

@@ -160,6 +160,9 @@ kestra:
metrics:
prefix: kestra
traces:
root: DISABLED
server:
basic-auth:
enabled: false
@@ -194,3 +197,12 @@ kestra:
prefixes:
- system.
- internal.
otel:
exclusions:
- /ping
- /metrics
- /health
- /env
- /prometheus
propagators: tracecontext, baggage

View File

@@ -1,6 +1,7 @@
package io.kestra.core.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import lombok.*;
import lombok.experimental.SuperBuilder;
@@ -93,7 +94,7 @@ public class HttpRequest {
.build();
}
public HttpUriRequest to() throws IOException {
public HttpUriRequest to(RunContext runContext) throws IOException {
HttpUriRequestBase builder = new HttpUriRequestBase(this.method, this.uri);
// headers
@@ -104,6 +105,10 @@ public class HttpRequest {
);
}
if (runContext.getTraceParent() != null) {
builder.addHeader("traceparent", runContext.getTraceParent());
}
// body
if (this.body != null) {
builder.setEntity(this.body.to());

View File

@@ -248,7 +248,7 @@ public class HttpClient implements Closeable {
HttpClientResponseHandler<HttpResponse<T>> responseHandler
) throws HttpClientException {
try {
return this.client.execute(request.to(), httpClientContext, responseHandler);
return this.client.execute(request.to(runContext), httpClientContext, responseHandler);
} catch (SocketException e) {
throw new HttpClientRequestException(e.getMessage(), request, e);
} catch (IOException e) {

View File

@@ -29,6 +29,7 @@ import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.experimental.NonFinal;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
@@ -106,6 +107,10 @@ public class Execution implements DeletedInterface, TenantInterface {
@Nullable
Instant scheduleDate;
@NonFinal
@Setter
String traceParent;
/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
*
@@ -199,7 +204,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.traceParent
);
}
@@ -222,7 +228,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.traceParent
);
}
@@ -258,7 +265,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.traceParent
);
}
@@ -281,7 +289,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.traceParent
);
}

View File

@@ -2,6 +2,8 @@ package io.kestra.core.runners;
import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.flows.State;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import lombok.Getter;
import lombok.Synchronized;
import org.slf4j.Logger;
@@ -25,6 +27,9 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {
@Getter
String type;
@Getter
String uid;
@Getter
Throwable exception;
@@ -34,10 +39,11 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {
private Thread currentThread;
AbstractWorkerCallable(RunContext runContext, String type, ClassLoader classLoader) {
AbstractWorkerCallable(RunContext runContext, String type, String uid, ClassLoader classLoader) {
this.logger = runContext.logger();
this.runContext = runContext;
this.type = type;
this.uid = uid;
this.classLoader = classLoader;
}
@@ -100,6 +106,7 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {
protected State.Type exceptionHandler(Throwable e) {
this.exception = e;
Span.current().recordException(e).setStatus(StatusCode.ERROR);
if (this.killed) {
return KILLED;

View File

@@ -13,7 +13,7 @@ abstract class AbstractWorkerTriggerCallable extends AbstractWorkerCallable {
WorkerTrigger workerTrigger;
AbstractWorkerTriggerCallable(RunContext runContext, String type, WorkerTrigger workerTrigger) {
super(runContext, type, workerTrigger.getTrigger().getClass().getClassLoader());
super(runContext, type, workerTrigger.uid(), workerTrigger.getTrigger().getClass().getClassLoader());
this.workerTrigger = workerTrigger;
}

View File

@@ -60,6 +60,7 @@ public class DefaultRunContext extends RunContext {
private Storage storage;
private Map<String, Object> pluginConfiguration;
private List<String> secretInputs;
private String traceParent;
// those are only used to validate dynamic properties inside the RunContextProperty
private Task task;
@@ -103,6 +104,20 @@ public class DefaultRunContext extends RunContext {
return secretInputs;
}
/**
* {@inheritDoc}
*/
@Override
@JsonInclude
public String getTraceParent() {
return traceParent;
}
@Override
public void setTraceParent(String traceParent) {
this.traceParent = traceParent;
}
@JsonIgnore
public ApplicationContext getApplicationContext() {
return applicationContext;

View File

@@ -14,7 +14,11 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.Storage;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.MapUtils;
import io.kestra.core.trace.propagation.ExecutionTextMapSetter;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.stream.Streams;
@@ -22,6 +26,7 @@ import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import static io.kestra.core.trace.Tracer.throwCallable;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@Slf4j
@@ -68,113 +73,128 @@ public final class ExecutableUtils {
boolean inheritLabels,
Property<ZonedDateTime> scheduleDate
) throws IllegalVariableEvaluationException {
// If we are in a flow that is restarted, we search for existing run of the task to restart them
if (currentExecution.getLabels() != null && currentExecution.getLabels().contains(new Label(Label.RESTARTED, "true"))
&& currentTask.getRestartBehavior() == ExecutableTask.RestartBehavior.RETRY_FAILED) {
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionRepositoryInterface.class);
// extract a trace context for propagation
var openTelemetry = ((DefaultRunContext) runContext).getApplicationContext().getBean(OpenTelemetry.class);
var propagator = openTelemetry.getPropagators().getTextMapPropagator();
var tracerFactory = ((DefaultRunContext) runContext).getApplicationContext().getBean(TracerFactory.class);
var tracer = tracerFactory.getTracer(currentTask.getClass(), "EXECUTOR");
Optional<Execution> existingSubflowExecution = Optional.empty();
if (currentTaskRun.getOutputs() != null && currentTaskRun.getOutputs().containsKey("executionId")) {
// we know which execution to restart; this should be the case for Subflow tasks
existingSubflowExecution = executionRepository.findById(currentExecution.getTenantId(), (String) currentTaskRun.getOutputs().get("executionId"));
}
return tracer.inNewContext(
currentExecution,
currentTask.getType(),
throwCallable(() -> {
// If we are in a flow that is restarted, we search for existing run of the task to restart them
if (currentExecution.getLabels() != null && currentExecution.getLabels().contains(new Label(Label.RESTARTED, "true"))
&& currentTask.getRestartBehavior() == ExecutableTask.RestartBehavior.RETRY_FAILED) {
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionRepositoryInterface.class);
if (existingSubflowExecution.isEmpty()) {
// otherwise, we try to find the correct one; this should be the case for ForEachItem tasks
List<Execution> childExecutions = executionRepository.findAllByTriggerExecutionId(currentExecution.getTenantId(), currentExecution.getId())
.filter(e -> e.getNamespace().equals(currentTask.subflowId().namespace()) && e.getFlowId().equals(currentTask.subflowId().flowId()) && e.getTrigger().getId().equals(currentTask.getId()))
.filter(e -> Objects.equals(e.getTrigger().getVariables().get("taskRunId"), currentTaskRun.getId()) && Objects.equals(e.getTrigger().getVariables().get("taskRunValue"), currentTaskRun.getValue()) && Objects.equals(e.getTrigger().getVariables().get("taskRunIteration"), currentTaskRun.getIteration()))
.collectList()
.block();
Optional<Execution> existingSubflowExecution = Optional.empty();
if (currentTaskRun.getOutputs() != null && currentTaskRun.getOutputs().containsKey("executionId")) {
// we know which execution to restart; this should be the case for Subflow tasks
existingSubflowExecution = executionRepository.findById(currentExecution.getTenantId(), (String) currentTaskRun.getOutputs().get("executionId"));
}
if (childExecutions != null && childExecutions.size() == 1) {
// if there are more than one, we ignore the results and create a new one
existingSubflowExecution = Optional.of(childExecutions.getFirst());
if (existingSubflowExecution.isEmpty()) {
// otherwise, we try to find the correct one; this should be the case for ForEachItem tasks
List<Execution> childExecutions = executionRepository.findAllByTriggerExecutionId(currentExecution.getTenantId(), currentExecution.getId())
.filter(e -> e.getNamespace().equals(currentTask.subflowId().namespace()) && e.getFlowId().equals(currentTask.subflowId().flowId()) && e.getTrigger().getId().equals(currentTask.getId()))
.filter(e -> Objects.equals(e.getTrigger().getVariables().get("taskRunId"), currentTaskRun.getId()) && Objects.equals(e.getTrigger().getVariables().get("taskRunValue"), currentTaskRun.getValue()) && Objects.equals(e.getTrigger().getVariables().get("taskRunIteration"), currentTaskRun.getIteration()))
.collectList()
.block();
if (childExecutions != null && childExecutions.size() == 1) {
// if there are more than one, we ignore the results and create a new one
existingSubflowExecution = Optional.of(childExecutions.getFirst());
}
}
if (existingSubflowExecution.isPresent()) {
Execution subflowExecution = existingSubflowExecution.get();
if (!subflowExecution.getState().isFailed()) {
// don't restart it as it's terminated successfully
return Optional.empty();
}
ExecutionService executionService = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionService.class);
try {
Execution restarted = executionService.restart(subflowExecution, null);
// inject the traceparent into the new execution
propagator.inject(Context.current(), restarted, ExecutionTextMapSetter.INSTANCE);
return Optional.of(SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
.execution(restarted)
.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
if (existingSubflowExecution.isPresent()) {
Execution subflowExecution = existingSubflowExecution.get();
if (!subflowExecution.getState().isFailed()) {
// don't restart it as it's terminated successfully
return Optional.empty();
}
ExecutionService executionService = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionService.class);
try {
Execution restarted = executionService.restart(subflowExecution, null);
return Optional.of(SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
.execution(restarted)
.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Optional<Integer> subflowRevision = currentTask.subflowId().revision();
Flow flow = flowExecutorInterface.findByIdFromTask(
currentExecution.getTenantId(),
subflowNamespace,
subflowId,
subflowRevision,
currentExecution.getTenantId(),
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
}
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Optional<Integer> subflowRevision = currentTask.subflowId().revision();
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromTask(
currentExecution.getTenantId(),
subflowNamespace,
subflowId,
subflowRevision,
currentExecution.getTenantId(),
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
List<Label> newLabels = inheritLabels ? new ArrayList<>(currentExecution.getLabels()) : new ArrayList<>(systemLabels(currentExecution));
if (labels != null) {
labels.forEach(throwConsumer(label -> newLabels.add(new Label(runContext.render(label.key()), runContext.render(label.value())))));
}
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
var variables = ImmutableMap.<String, Object>builder().putAll(Map.of(
"executionId", currentExecution.getId(),
"namespace", currentFlow.getNamespace(),
"flowId", currentFlow.getId(),
"flowRevision", currentFlow.getRevision(),
"taskRunId", currentTaskRun.getId()
));
if (currentTaskRun.getValue() != null) {
variables.put("taskRunValue", currentTaskRun.getValue());
}
if (currentTaskRun.getIteration() != null) {
variables.put("taskRunIteration", currentTaskRun.getIteration());
}
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
List<Label> newLabels = inheritLabels ? new ArrayList<>(currentExecution.getLabels()) : new ArrayList<>(systemLabels(currentExecution));
if (labels != null) {
labels.forEach(throwConsumer(label -> newLabels.add(new Label(runContext.render(label.key()), runContext.render(label.value())))));
}
var variables = ImmutableMap.<String, Object>builder().putAll(Map.of(
"executionId", currentExecution.getId(),
"namespace", currentFlow.getNamespace(),
"flowId", currentFlow.getId(),
"flowRevision", currentFlow.getRevision(),
"taskRunId", currentTaskRun.getId()
));
if (currentTaskRun.getValue() != null) {
variables.put("taskRunValue", currentTaskRun.getValue());
}
if (currentTaskRun.getIteration() != null) {
variables.put("taskRunIteration", currentTaskRun.getIteration());
}
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
Execution execution = Execution
.newExecution(
flow,
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
newLabels,
Optional.empty())
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())
.variables(variables.build())
.build()
)
.withScheduleDate(scheduleOnDate);
return Optional.of(SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
.execution(execution)
.build());
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
Execution execution = Execution
.newExecution(
flow,
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
newLabels,
Optional.empty())
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())
.variables(variables.build())
.build()
)
.withScheduleDate(scheduleOnDate);
// inject the traceparent into the new execution
propagator.inject(Context.current(), execution, ExecutionTextMapSetter.INSTANCE);
return Optional.of(SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
.execution(execution)
.build());
}));
}
private static List<Label> systemLabels(Execution execution) {

View File

@@ -14,6 +14,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.services.*;
import io.kestra.core.trace.propagation.RunContextTextMapSetter;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.TruthUtils;
import io.kestra.plugin.core.flow.Pause;
@@ -21,6 +22,8 @@ import io.kestra.plugin.core.flow.Subflow;
import io.kestra.plugin.core.flow.WaitFor;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.ApplicationContext;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -70,6 +73,9 @@ public class ExecutorService {
@Inject
private SLAService slaService;
@Inject
private OpenTelemetry openTelemetry;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@@ -733,6 +739,8 @@ public class ExecutorService {
return executor;
}
var propagator = openTelemetry.getPropagators().getTextMapPropagator();
// submit TaskRun when receiving created, must be done after the state execution store
Map<Boolean, List<WorkerTask>> workerTasks = executor.getExecution()
.getTaskRunList()
@@ -741,6 +749,7 @@ public class ExecutorService {
.map(throwFunction(taskRun -> {
Task task = executor.getFlow().findTaskByTaskId(taskRun.getTaskId());
RunContext runContext = runContextFactory.of(executor.getFlow(), task, executor.getExecution(), taskRun);
propagator.inject(Context.current(), runContext, RunContextTextMapSetter.INSTANCE); // inject the traceparent into the run context
WorkerTask workerTask = WorkerTask.builder()
.runContext(runContext)
.taskRun(taskRun)

View File

@@ -43,6 +43,14 @@ public abstract class RunContext {
@JsonInclude
public abstract List<String> getSecretInputs();
/**
* OpenTelemetry trace parent
*/
@JsonInclude
public abstract String getTraceParent();
public abstract void setTraceParent(String traceParent);
public abstract String render(String inline) throws IllegalVariableEvaluationException;
public abstract Object renderTyped(String inline) throws IllegalVariableEvaluationException;

View File

@@ -10,6 +10,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.*;
@@ -19,12 +20,16 @@ import io.kestra.core.server.*;
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.trace.TraceUtils;
import io.kestra.core.utils.*;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.opentelemetry.api.common.Attributes;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
@@ -142,6 +147,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
private final AtomicInteger pendingJobCount = new AtomicInteger(0);
private final AtomicInteger runningJobCount = new AtomicInteger(0);
@Inject
private TracerFactory tracerFactory;
private Tracer tracer;
/**
* Creates a new {@link Worker} instance.
*
@@ -167,12 +176,14 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
@PostConstruct
void initMetrics() {
void initMetricsAndTracer() {
String[] tags = this.workerGroup == null ? new String[0] : new String[]{MetricRegistry.TAG_WORKER_GROUP, this.workerGroup};
// create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, numThreads, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, pendingJobCount, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, runningJobCount, tags);
this.tracer = tracerFactory.getTracer(Worker.class, "WORKER");
}
@Override
@@ -819,7 +830,15 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
try {
return workerSecurityService.callInSecurityContext(workerJobCallable);
return tracer.inCurrentContext(
workerJobCallable.runContext,
workerJobCallable.getType(),
Attributes.of(TraceUtils.ATTR_UID, workerJobCallable.getUid()),
() -> workerSecurityService.callInSecurityContext(workerJobCallable)
);
} catch(Exception e) {
// should only occur if it fails in the tracing code which should be unexpected
return State.Type.FAILED;
} finally {
synchronized (this) {
workerCallableReferences.remove(workerJobCallable);

View File

@@ -25,7 +25,7 @@ public class WorkerTaskCallable extends AbstractWorkerCallable {
Output taskOutput;
WorkerTaskCallable(WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry) {
super(runContext, task.getClass().getName(), task.getClass().getClassLoader());
super(runContext, task.getClass().getName(), workerTask.uid(), task.getClass().getClassLoader());
this.workerTask = workerTask;
this.task = task;
this.metricRegistry = metricRegistry;

View File

@@ -0,0 +1,116 @@
package io.kestra.core.trace;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.RunContext;
import io.kestra.core.trace.propagation.ExecutionTextMapGetter;
import io.kestra.core.trace.propagation.RunContextTextMapGetter;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
class DefaultTracer implements Tracer {
private final OpenTelemetry openTelemetry;
private final io.opentelemetry.api.trace.Tracer tracer;
private final String spanNamePrefix;
private final TraceLevel level; // FIXME useless for now as we didn't handle FINE level
private final Attributes baseAttributes;
DefaultTracer(OpenTelemetry openTelemetry, io.opentelemetry.api.trace.Tracer tracer, String spanNamePrefix, TraceLevel level, Attributes baseAttributes) {
this.openTelemetry = openTelemetry;
this.tracer = tracer;
this.spanNamePrefix = spanNamePrefix;
this.level = level;
this.baseAttributes = baseAttributes;
}
@Override
public <V> V inCurrentContext(RunContext runContext, String spanName, Callable<V> callable) {
return inCurrentContext(runContext, spanName, null, callable);
}
@Override
public <V> V inCurrentContext(RunContext runContext, String spanName, Attributes additionalAttributes, Callable<V> callable) {
// extract the traceparent from the run context to allow trace propagation
var propagator = openTelemetry.getPropagators().getTextMapPropagator();
var extractedContext = propagator.extract(Context.current(), runContext, RunContextTextMapGetter.INSTANCE);
var attributesBuilder = Attributes.builder()
.putAll(baseAttributes)
.putAll(TraceUtils.attributesFrom(runContext));
if (additionalAttributes != null) {
attributesBuilder.putAll(additionalAttributes);
}
return inCurrentContext(extractedContext, spanName, attributesBuilder.build(), callable);
}
@Override
public <V> V inCurrentContext(Execution execution, String spanName, Callable<V> callable) {
return inCurrentContext(execution, spanName, null, callable);
}
@Override
public <V> V inCurrentContext(Execution execution, String spanName, Attributes additionalAttributes, Callable<V> callable) {
// extract the traceparent from the execution to allow trace propagation
var propagator = openTelemetry.getPropagators().getTextMapPropagator();
var extractedContext = propagator.extract(Context.current(), execution, ExecutionTextMapGetter.INSTANCE);
var attributesBuilder = Attributes.builder()
.putAll(baseAttributes)
.putAll(TraceUtils.attributesFrom(execution));
if (additionalAttributes != null) {
attributesBuilder.putAll(additionalAttributes);
}
return inCurrentContext(extractedContext, spanName, attributesBuilder.build(), callable);
}
@Override
public <V> V inNewContext(Execution execution, String spanName, Callable<V> callable) {
return inNewContext(execution, spanName, null, callable);
}
@Override
public <V> V inNewContext(Execution execution, String spanName, Attributes additionalAttributes, Callable<V> callable) {
var attributesBuilder = Attributes.builder()
.putAll(baseAttributes)
.putAll(TraceUtils.attributesFrom(execution));
if (additionalAttributes != null) {
attributesBuilder.putAll(additionalAttributes);
}
return inNewContext(spanName, attributesBuilder.build(), callable);
}
private <V> V inCurrentContext(Context context, String spanName, Attributes attributes, Callable<V> callable) {
try (Scope ignored = context.makeCurrent()) {
var span = tracer.spanBuilder(spanNamePrefix + " - " + spanName)
.setAllAttributes(attributes)
.startSpan();
try {
return callable.call();
} catch(Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
throw e;
} finally {
span.end();
}
}
}
private <V> V inNewContext(String spanName, Attributes attributes, Callable<V> callable) {
var span = tracer.spanBuilder(spanNamePrefix + " - " + spanName)
.setAllAttributes(attributes)
.startSpan();
try (Scope ignored = span.makeCurrent()) {
return callable.call();
} catch(Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
throw e;
} finally {
span.end();
}
}
}

View File

@@ -0,0 +1,37 @@
package io.kestra.core.trace;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.RunContext;
import io.opentelemetry.api.common.Attributes;
class NoopTracer implements Tracer {
@Override
public <V> V inCurrentContext(RunContext runContext, String spanName, Callable<V> callable) {
return callable.call();
}
@Override
public <V> V inCurrentContext(RunContext runContext, String spanName, Attributes additionalAttributes, Callable<V> callable) {
return callable.call();
}
@Override
public <V> V inCurrentContext(Execution execution, String spanName, Callable<V> callable) {
return callable.call();
}
@Override
public <V> V inCurrentContext(Execution execution, String spanName, Attributes additionalAttributes, Callable<V> callable) {
return callable.call();
}
@Override
public <V> V inNewContext(Execution execution, String spanName, Callable<V> callable) {
return callable.call();
}
@Override
public <V> V inNewContext(Execution execution, String spanName, Attributes additionalAttributes, Callable<V> callable) {
return callable.call();
}
}

View File

@@ -0,0 +1,5 @@
package io.kestra.core.trace;
public enum TraceLevel {
DISABLED, DEFAULT, FINE
}

View File

@@ -0,0 +1,56 @@
package io.kestra.core.trace;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.RunContext;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.util.Map;
public final class TraceUtils {
public static final AttributeKey<String> ATTR_UID = AttributeKey.stringKey("kestra.uid");
private static final AttributeKey<String> ATTR_TENANT_ID = AttributeKey.stringKey("kestra.tenantId");
private static final AttributeKey<String> ATTR_NAMESPACE = AttributeKey.stringKey("kestra.namespace");
private static final AttributeKey<String> ATTR_FLOW_ID = AttributeKey.stringKey("kestra.flowId");
private static final AttributeKey<String> ATTR_EXECUTION_ID = AttributeKey.stringKey("kestra.executionId");
public static final AttributeKey<String> ATTR_SOURCE = AttributeKey.stringKey("kestra.source");
private TraceUtils() {}
public static Attributes attributesFrom(Execution execution) {
var builder = Attributes.builder()
.put(ATTR_NAMESPACE, execution.getNamespace())
.put(ATTR_FLOW_ID, execution.getFlowId())
.put(ATTR_EXECUTION_ID, execution.getId());
if (execution.getTenantId() != null) {
builder.put(ATTR_TENANT_ID, execution.getTenantId());
}
return builder.build();
}
@SuppressWarnings("unchecked")
public static Attributes attributesFrom(RunContext runContext) {
var flowInfo = runContext.flowInfo();
var execution = (Map<String, Object>) runContext.getVariables().get("execution");
var executionId = (String) execution.get("id");
var builder = Attributes.builder()
.put(ATTR_NAMESPACE, flowInfo.namespace())
.put(ATTR_FLOW_ID, flowInfo.id())
.put(ATTR_EXECUTION_ID, executionId);
if (flowInfo.tenantId() != null) {
builder.put(ATTR_TENANT_ID, flowInfo.tenantId());
}
return builder.build();
}
public static Attributes attributesFrom(Class<?> clazz) {
return Attributes.builder().put(ATTR_SOURCE, clazz.getName()).build();
}
}

View File

@@ -0,0 +1,90 @@
package io.kestra.core.trace;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Rethrow;
import io.opentelemetry.api.common.Attributes;
/**
* A <code>Tracer</code> allow to instrument a block of code with OpenTelemetry traces.
*/
public interface Tracer {
/**
* Instrument a block of code with a trace context extracted from the run context.
*
* @see #inCurrentContext(RunContext, String, Attributes, Callable)
*/
<V> V inCurrentContext(RunContext runContext, String spanName, Callable<V> callable);
/**
* Instrument a block of code with a trace context extracted from the run context.
* Default span attributes will be added derived from the run context.
*
* @param runContext the run context
* @param spanName the name of the span
* @param additionalAttributes additional span attributes
* @param callable the bock of code
*/
<V> V inCurrentContext(RunContext runContext, String spanName, Attributes additionalAttributes, Callable<V> callable);
/**
* Instrument a block of code with a trace context extracted from the execution.
*
* @see #inCurrentContext(Execution, String, Attributes, Callable)
*/
<V> V inCurrentContext(Execution execution, String spanName, Callable<V> callable);
/**
* Instrument a block of code with a trace context extracted from the execution.
* Default span attributes will be added derived from the execution.
*
* @param execution the execution
* @param spanName the name of the span
* @param additionalAttributes additional span attributes
* @param callable the bock of code
*/
<V> V inCurrentContext(Execution execution, String spanName, Attributes additionalAttributes, Callable<V> callable);
/**
* Instrument a block of code with a new trace context from a parent context extracted from the execution.
*
* @see #inNewContext(Execution, String, Attributes, Callable)
*/
<V> V inNewContext(Execution execution, String spanName, Callable<V> callable);
/**
* Instrument a block of code with a new trace context from a parent context extracted from the execution.
* Default span attributes will be added derived from the execution.
*
* @param execution the execution
* @param spanName the name of the span
* @param additionalAttributes additional span attributes
* @param callable the bock of code
*/
<V> V inNewContext(Execution execution, String spanName, Attributes additionalAttributes, Callable<V> callable);
@FunctionalInterface
interface Callable<V> {
V call();
}
@FunctionalInterface
interface CallableChecked<R, E extends Exception> {
R call() throws E;
}
static <R, E extends Exception> Callable<R> throwCallable(Rethrow.CallableChecked<R, E> runnable) throws E {
return () -> {
try {
return runnable.call();
} catch (Exception exception) {
return throwException(exception);
}
};
}
@SuppressWarnings("unchecked")
private static <E extends Exception, R> R throwException(Exception exception) throws E {
throw (E) exception;
}
}

View File

@@ -0,0 +1,46 @@
package io.kestra.core.trace;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
/**
* Creates <code>Trace</code> instances.
*
* @see Tracer
*/
@Singleton
public class TracerFactory {
@Inject
private OpenTelemetry openTelemetry;
@Inject
private io.opentelemetry.api.trace.Tracer tracer;
@Inject
private TracesConfiguration tracesConfiguration;
/**
* Get a tracer for a class with a given prefix for the span names.
*/
public Tracer getTracer(Class<?> clazz, String spanNamePrefix) {
TraceLevel level = levelFromConfiguration(clazz.getName());
Attributes attributes = TraceUtils.attributesFrom(clazz);
return level == TraceLevel.DISABLED ? new NoopTracer() : new DefaultTracer(openTelemetry, tracer, spanNamePrefix, level, attributes);
}
private TraceLevel levelFromConfiguration(String name) {
if (name == null) {
return tracesConfiguration.root();
} else if(tracesConfiguration.categories().containsKey(name)) {
return tracesConfiguration.categories().get(name);
} else {
if (name.contains(".")) {
return levelFromConfiguration(name.substring(0, name.lastIndexOf('.')));
} else {
return tracesConfiguration.root();
}
}
}
}

View File

@@ -0,0 +1,19 @@
package io.kestra.core.trace;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.convert.format.MapFormat;
import jakarta.validation.constraints.NotNull;
import java.util.Map;
@ConfigurationProperties("kestra.traces")
public record TracesConfiguration (
@NotNull @Bindable(defaultValue = "DISABLED")
TraceLevel root,
@NotNull @Bindable(defaultValue = "{}")
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, TraceLevel> categories
) {
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.trace.propagation;
import io.kestra.core.models.executions.Execution;
import io.opentelemetry.context.propagation.TextMapGetter;
import javax.annotation.Nullable;
import java.util.List;
public class ExecutionTextMapGetter implements TextMapGetter<Execution> {
public static final ExecutionTextMapGetter INSTANCE = new ExecutionTextMapGetter();
@Override
public Iterable<String> keys(Execution carrier) {
return List.of("traceparent");
}
@Nullable
@Override
public String get(@Nullable Execution carrier, String key) {
if (carrier == null) {
return null;
}
return switch(key) {
case "traceparent" -> carrier.getTraceParent();
default -> null;
};
}
}

View File

@@ -0,0 +1,21 @@
package io.kestra.core.trace.propagation;
import io.kestra.core.models.executions.Execution;
import io.opentelemetry.context.propagation.TextMapSetter;
import javax.annotation.Nullable;
public class ExecutionTextMapSetter implements TextMapSetter<Execution> {
public static final ExecutionTextMapSetter INSTANCE = new ExecutionTextMapSetter();
@Override
public void set(@Nullable Execution carrier, String key, String value) {
if (carrier != null) {
switch (key) {
case "traceparent" -> carrier.setTraceParent(value);
default -> {
}
}
}
}
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.trace.propagation;
import io.kestra.core.runners.RunContext;
import io.opentelemetry.context.propagation.TextMapGetter;
import javax.annotation.Nullable;
import java.util.List;
public class RunContextTextMapGetter implements TextMapGetter<RunContext> {
public static final RunContextTextMapGetter INSTANCE = new RunContextTextMapGetter();
@Override
public Iterable<String> keys(RunContext carrier) {
return List.of("traceparent");
}
@Nullable
@Override
public String get(@Nullable RunContext carrier, String key) {
if (carrier == null) {
return null;
}
return switch(key) {
case "traceparent" -> carrier.getTraceParent();
default -> null;
};
}
}

View File

@@ -0,0 +1,21 @@
package io.kestra.core.trace.propagation;
import io.kestra.core.runners.RunContext;
import io.opentelemetry.context.propagation.TextMapSetter;
import javax.annotation.Nullable;
public class RunContextTextMapSetter implements TextMapSetter<RunContext> {
public static final RunContextTextMapSetter INSTANCE = new RunContextTextMapSetter();
@Override
public void set(@Nullable RunContext carrier, String key, String value) {
if (carrier != null) {
switch (key) {
case "traceparent" -> carrier.setTraceParent(value);
default -> {
}
}
}
}
}

View File

@@ -1,6 +1,5 @@
package io.kestra.core.utils;
import java.util.concurrent.Callable;
import java.util.function.*;
public final class Rethrow {
@@ -114,16 +113,6 @@ public final class Rethrow {
};
}
public static <R, E extends Exception> Callable<R> throwCallable(CallableChecked<R, E> runnable) throws E {
return () -> {
try {
return runnable.call();
} catch (Exception exception) {
return throwException(exception);
}
};
}
@SuppressWarnings("unchecked")
private static <E extends Exception, R> R throwException(Exception exception) throws E {
throw (E) exception;

View File

@@ -21,4 +21,15 @@ services:
POSTGRES_PASSWORD: k3str4
ports:
- 5432:5432
restart: on-failure
restart: on-failure
# jaeger-all-in-one:
# image: jaegertracing/all-in-one:latest
# ports:
# - "16686:16686" # Jaeger UI
# - "14268:14268" # Receive legacy OpenTracing traces, optional
# - "4317:4317" # OTLP gRPC receiver
# - "4318:4318" # OTLP HTTP receiver
# - "14250:14250" # Receive from external otel-collector, optional
# environment:
# - COLLECTOR_OTLP_ENABLED=true

View File

@@ -27,6 +27,8 @@ import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.services.*;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.*;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
@@ -36,6 +38,8 @@ import io.kestra.plugin.core.flow.ForEachItem;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
@@ -60,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@SuppressWarnings("deprecation")
@Singleton
@@ -172,6 +175,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Inject
private SLAService slaService;
private final Tracer tracer;
private final FlowRepositoryInterface flowRepository;
private final JdbcServiceLivenessCoordinator serviceLivenessCoordinator;
@@ -203,12 +208,14 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Nullable final JdbcServiceLivenessCoordinator serviceLivenessCoordinator,
final FlowRepositoryInterface flowRepository,
final AbstractJdbcFlowTopologyRepository flowTopologyRepository,
final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher
) {
final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher,
final TracerFactory tracerFactory
) {
this.serviceLivenessCoordinator = serviceLivenessCoordinator;
this.flowRepository = flowRepository;
this.flowTopologyRepository = flowTopologyRepository;
this.eventPublisher = eventPublisher;
this.tracer = tracerFactory.getTracer(JdbcExecutor.class, "EXECUTOR");
}
@SneakyThrows
@@ -425,187 +432,194 @@ public class JdbcExecutor implements ExecutorInterface, Service {
return;
}
Executor result = executionRepository.lock(message.getId(), throwFunction(pair -> {
try {
Execution execution = pair.getLeft();
ExecutorState executorState = pair.getRight();
Executor result = executionRepository.lock(message.getId(), pair -> {
Execution execution = pair.getLeft();
ExecutorState executorState = pair.getRight();
final Flow flow = transform(this.flowRepository.findByExecutionWithSource(execution), execution);
Executor executor = new Executor(execution, null).withFlow(flow);
return tracer.inCurrentContext(
execution,
Flow.uidWithoutRevision(execution),
() -> {
try {
// schedule it for later if needed
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
ExecutionDelay executionDelay = ExecutionDelay.builder()
.executionId(executor.getExecution().getId())
.date(execution.getScheduleDate())
.state(State.Type.RUNNING)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
executionDelayStorage.save(executionDelay);
return Pair.of(
executor,
executorState
);
}
final Flow flow = transform(this.flowRepository.findByExecutionWithSource(execution), execution);
Executor executor = new Executor(execution, null).withFlow(flow);
// create an SLA monitor if needed
if (execution.getState().getCurrent() == State.Type.CREATED && !ListUtils.isEmpty(flow.getSla())) {
List<SLAMonitor> monitors = flow.getSla().stream()
.filter(ExecutionMonitoringSLA.class::isInstance)
.map( ExecutionMonitoringSLA.class::cast)
.map(sla -> SLAMonitor.builder()
.executionId(execution.getId())
.slaId(((SLA) sla).getId())
.deadline(execution.getState().getStartDate().plus(sla.getDuration()))
.build()
)
.toList();
monitors.forEach(monitor -> slaMonitorStorage.save(monitor));
}
// queue execution if needed (limit concurrency)
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
ExecutionCount count = executionRepository.executionCounts(
flow.getTenantId(),
List.of(new io.kestra.core.models.executions.statistics.Flow(flow.getNamespace(), flow.getId())),
List.of(State.Type.RUNNING, State.Type.PAUSED),
null,
null,
null
).getFirst();
executor = executorService.checkConcurrencyLimit(executor, flow, execution, count.getCount());
// the execution has been queued, we save the queued execution and stops here
if (executor.getExecutionRunning() != null && executor.getExecutionRunning().getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStorage.save(ExecutionQueued.fromExecutionRunning(executor.getExecutionRunning()));
return Pair.of(
executor,
executorState
);
}
// the execution has been moved to FAILED or CANCELLED, we stop here
if (executor.getExecution().getState().isTerminated()) {
return Pair.of(
executor,
executorState
);
}
}
// handle execution changed SLA
executor = executorService.handleExecutionChangedSLA(executor);
// process the execution
if (log.isDebugEnabled()) {
executorService.log(log, true, executor);
}
executor = executorService.process(executor);
if (!executor.getNexts().isEmpty() && deduplicateNexts(execution, executorState, executor.getNexts())) {
executor.withExecution(
executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()),
"onNexts"
);
}
// worker task
if (!executor.getWorkerTasks().isEmpty()) {
executor
.getWorkerTasks()
.stream()
.filter(workerTask -> this.deduplicateWorkerTask(execution, executorState, workerTask.getTaskRun()))
.forEach(throwConsumer(workerTask -> {
try {
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
}
else {
if (workerTask.getTask().isSendToWorkerTask()) {
workerTaskQueue.emit(workerGroupService.resolveGroupFromJob(workerTask).map(group -> group.getKey()).orElse(null), workerTask);
}
if (workerTask.getTask().isFlowable()) {
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
}
}
} catch (IllegalVariableEvaluationException e) {
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
workerTask.getRunContext().logger().error("Unable to evaluate the runIf condition for task {}", workerTask.getTask().getId(), e);
}
}));
}
// worker tasks results
if (!executor.getWorkerTaskResults().isEmpty()) {
executor.getWorkerTaskResults()
.forEach(throwConsumer(workerTaskResult -> workerTaskResultQueue.emit(workerTaskResult)));
}
// subflow execution results
if (!executor.getSubflowExecutionResults().isEmpty()) {
executor.getSubflowExecutionResults()
.forEach(throwConsumer(subflowExecutionResult -> subflowExecutionResultQueue.emit(subflowExecutionResult)));
}
// schedulerDelay
if (!executor.getExecutionDelays().isEmpty()) {
executor.getExecutionDelays()
.forEach(executionDelay -> executionDelayStorage.save(executionDelay));
}
// subflow execution watchers
if (!executor.getSubflowExecutions().isEmpty()) {
subflowExecutionStorage.save(executor.getSubflowExecutions());
List<SubflowExecution<?>> subflowExecutionDedup = executor
.getSubflowExecutions()
.stream()
.filter(subflowExecution -> this.deduplicateSubflowExecution(execution, executorState, subflowExecution.getParentTaskRun()))
.toList();
subflowExecutionDedup
.forEach(throwConsumer(subflowExecution -> {
Execution subExecution = subflowExecution.getExecution();
String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
JdbcExecutor.log.info(log);
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun()).toBuilder()
.level(Level.INFO)
.message(log)
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
.thread(Thread.currentThread().getName())
.build()
// schedule it for later if needed
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
ExecutionDelay executionDelay = ExecutionDelay.builder()
.executionId(executor.getExecution().getId())
.date(execution.getScheduleDate())
.state(State.Type.RUNNING)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
executionDelayStorage.save(executionDelay);
return Pair.of(
executor,
executorState
);
}
executionQueue.emit(subflowExecution.getExecution());
// create an SLA monitor if needed
if (execution.getState().getCurrent() == State.Type.CREATED && !ListUtils.isEmpty(flow.getSla())) {
List<SLAMonitor> monitors = flow.getSla().stream()
.filter(ExecutionMonitoringSLA.class::isInstance)
.map(ExecutionMonitoringSLA.class::cast)
.map(sla -> SLAMonitor.builder()
.executionId(execution.getId())
.slaId(((SLA) sla).getId())
.deadline(execution.getState().getStartDate().plus(sla.getDuration()))
.build()
)
.toList();
monitors.forEach(monitor -> slaMonitorStorage.save(monitor));
}
// send a running worker task result to track running vs created status
if (subflowExecution.getParentTask().waitForExecution()) {
sendSubflowExecutionResult(execution, subflowExecution, subflowExecution.getParentTaskRun());
// queue execution if needed (limit concurrency)
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
ExecutionCount count = executionRepository.executionCounts(
flow.getTenantId(),
List.of(new io.kestra.core.models.executions.statistics.Flow(flow.getNamespace(), flow.getId())),
List.of(State.Type.RUNNING, State.Type.PAUSED),
null,
null,
null
).getFirst();
executor = executorService.checkConcurrencyLimit(executor, flow, execution, count.getCount());
// the execution has been queued, we save the queued execution and stops here
if (executor.getExecutionRunning() != null && executor.getExecutionRunning().getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStorage.save(ExecutionQueued.fromExecutionRunning(executor.getExecutionRunning()));
return Pair.of(
executor,
executorState
);
}
}));
}
return Pair.of(
executor,
executorState
);
} catch (QueueException e) {
try {
this.executionQueue.emit(
message.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED)
);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", message.getId(), ex);
}
// the execution has been moved to FAILED or CANCELLED, we stop here
if (executor.getExecution().getState().isTerminated()) {
return Pair.of(
executor,
executorState
);
}
}
return null;
}
}));
// handle execution changed SLA
executor = executorService.handleExecutionChangedSLA(executor);
// process the execution
if (log.isDebugEnabled()) {
executorService.log(log, true, executor);
}
executor = executorService.process(executor);
if (!executor.getNexts().isEmpty() && deduplicateNexts(execution, executorState, executor.getNexts())) {
executor.withExecution(
executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()),
"onNexts"
);
}
// worker task
if (!executor.getWorkerTasks().isEmpty()) {
executor
.getWorkerTasks()
.stream()
.filter(workerTask -> this.deduplicateWorkerTask(execution, executorState, workerTask.getTaskRun()))
.forEach(throwConsumer(workerTask -> {
try {
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
} else {
if (workerTask.getTask().isSendToWorkerTask()) {
workerTaskQueue.emit(workerGroupService.resolveGroupFromJob(workerTask).map(group -> group.getKey()).orElse(null), workerTask);
}
if (workerTask.getTask().isFlowable()) {
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
}
}
} catch (IllegalVariableEvaluationException e) {
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
workerTask.getRunContext().logger().error("Unable to evaluate the runIf condition for task {}", workerTask.getTask().getId(), e);
}
}));
}
// worker tasks results
if (!executor.getWorkerTaskResults().isEmpty()) {
executor.getWorkerTaskResults()
.forEach(throwConsumer(workerTaskResult -> workerTaskResultQueue.emit(workerTaskResult)));
}
// subflow execution results
if (!executor.getSubflowExecutionResults().isEmpty()) {
executor.getSubflowExecutionResults()
.forEach(throwConsumer(subflowExecutionResult -> subflowExecutionResultQueue.emit(subflowExecutionResult)));
}
// schedulerDelay
if (!executor.getExecutionDelays().isEmpty()) {
executor.getExecutionDelays()
.forEach(executionDelay -> executionDelayStorage.save(executionDelay));
}
// subflow execution watchers
if (!executor.getSubflowExecutions().isEmpty()) {
subflowExecutionStorage.save(executor.getSubflowExecutions());
List<SubflowExecution<?>> subflowExecutionDedup = executor
.getSubflowExecutions()
.stream()
.filter(subflowExecution -> this.deduplicateSubflowExecution(execution, executorState, subflowExecution.getParentTaskRun()))
.toList();
subflowExecutionDedup
.forEach(throwConsumer(subflowExecution -> {
Execution subExecution = subflowExecution.getExecution();
String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
JdbcExecutor.log.info(log);
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun()).toBuilder()
.level(Level.INFO)
.message(log)
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
.thread(Thread.currentThread().getName())
.build()
);
executionQueue.emit(subflowExecution.getExecution());
// send a running worker task result to track running vs created status
if (subflowExecution.getParentTask().waitForExecution()) {
sendSubflowExecutionResult(execution, subflowExecution, subflowExecution.getParentTaskRun());
}
}));
}
return Pair.of(
executor,
executorState
);
} catch (QueueException e) {
try {
this.executionQueue.emit(
message.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED)
);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", message.getId(), ex);
}
Span.current().recordException(e).setStatus(StatusCode.ERROR);
return null;
}
}
);
});
if (result != null) {
this.toExecution(result);
@@ -1232,4 +1246,4 @@ public class JdbcExecutor implements ExecutorInterface, Service {
public ServiceState getState() {
return state.get();
}
}
}

View File

@@ -32,6 +32,7 @@ import io.kestra.core.services.*;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.trace.propagation.ExecutionTextMapSetter;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.trigger.Webhook;
@@ -69,6 +70,8 @@ import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
@@ -175,6 +178,9 @@ public class ExecutionController {
@Value("${kestra.url}")
private Optional<String> kestraUrl;
@Inject
private OpenTelemetry openTelemetry;
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/search")
@Operation(tags = {"Executions"}, summary = "Search for executions")
@@ -523,6 +529,10 @@ public class ExecutionController {
}
try {
// inject the traceparent into the execution
var propagator = openTelemetry.getPropagators().getTextMapPropagator();
propagator.inject(Context.current(), result, ExecutionTextMapSetter.INSTANCE);
executionQueue.emit(result);
eventPublisher.publishEvent(new CrudEvent<>(result, CrudEventType.CREATE));
return HttpResponse.ok(result);
@@ -592,6 +602,10 @@ public class ExecutionController {
.handle((executionInputs, sink) -> {
Execution executionWithInputs = current.withInputs(executionInputs);
try {
// inject the traceparent into the execution
var propagator = openTelemetry.getPropagators().getTextMapPropagator();
propagator.inject(Context.current(), executionWithInputs, ExecutionTextMapSetter.INSTANCE);
executionQueue.emit(executionWithInputs);
eventPublisher.publishEvent(new CrudEvent<>(executionWithInputs, CrudEventType.CREATE));
@@ -637,8 +651,8 @@ public class ExecutionController {
private final URI url;
// This is not nice, but we cannot use @AllArgsConstructor as it would open a bunch of necessary changes on the Execution class.
ExecutionResponse(String tenantId, String id, String namespace, String flowId, Integer flowRevision, List<TaskRun> taskRunList, Map<String, Object> inputs, Map<String, Object> outputs, List<Label> labels, Map<String, Object> variables, State state, String parentId, String originalId, ExecutionTrigger trigger, boolean deleted, ExecutionMetadata metadata, Instant scheduleDate, URI url) {
super(tenantId, id, namespace, flowId, flowRevision, taskRunList, inputs, outputs, labels, variables, state, parentId, originalId, trigger, deleted, metadata, scheduleDate);
ExecutionResponse(String tenantId, String id, String namespace, String flowId, Integer flowRevision, List<TaskRun> taskRunList, Map<String, Object> inputs, Map<String, Object> outputs, List<Label> labels, Map<String, Object> variables, State state, String parentId, String originalId, ExecutionTrigger trigger, boolean deleted, ExecutionMetadata metadata, Instant scheduleDate, String traceParent, URI url) {
super(tenantId, id, namespace, flowId, flowRevision, taskRunList, inputs, outputs, labels, variables, state, parentId, originalId, trigger, deleted, metadata, scheduleDate, traceParent);
this.url = url;
}
@@ -662,6 +676,7 @@ public class ExecutionController {
execution.isDeleted(),
execution.getMetadata(),
execution.getScheduleDate(),
execution.getTraceParent(),
url
);
}