feat(execution): add an execution kind

This allow to differentiate between normal executions and test executions
This commit is contained in:
Loïc Mathieu
2025-05-15 16:55:54 +02:00
parent c9a277d4d5
commit 30b39b2d30
24 changed files with 199 additions and 66 deletions

View File

@@ -25,8 +25,8 @@ import io.kestra.core.services.LabelService;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
@@ -117,6 +117,9 @@ public class Execution implements DeletedInterface, TenantInterface {
@Nullable
List<TaskFixture> fixtures;
@Nullable
ExecutionKind kind;
/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
*
@@ -216,7 +219,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.metadata,
this.scheduleDate,
this.traceParent,
this.fixtures
this.fixtures,
this.kind
);
}
@@ -241,7 +245,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.metadata,
this.scheduleDate,
this.traceParent,
this.fixtures
this.fixtures,
this.kind
);
}
@@ -279,7 +284,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.metadata,
this.scheduleDate,
this.traceParent,
this.fixtures
this.fixtures,
this.kind
);
}
@@ -304,7 +310,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.metadata,
this.scheduleDate,
this.traceParent,
this.fixtures
this.fixtures,
this.kind
);
}
@@ -754,7 +761,7 @@ public class Execution implements DeletedInterface, TenantInterface {
* @param e the exception raise
* @return new taskRun with added attempt
*/
private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun taskRun,
private FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun taskRun,
Exception e) {
return new FailedTaskRunWithLog(
taskRun
@@ -765,7 +772,7 @@ public class Execution implements DeletedInterface, TenantInterface {
.withState(State.Type.FAILED))
)
.withState(State.Type.FAILED),
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun))
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun, kind))
);
}
@@ -777,7 +784,7 @@ public class Execution implements DeletedInterface, TenantInterface {
* @param e the exception raise
* @return new taskRun with updated attempt with logs
*/
private static FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun,
private FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun,
TaskRunAttempt lastAttempt, Exception e) {
return new FailedTaskRunWithLog(
taskRun
@@ -791,7 +798,7 @@ public class Execution implements DeletedInterface, TenantInterface {
.toList()
)
.withState(State.Type.FAILED),
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun))
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun, kind))
);
}

View File

@@ -0,0 +1,10 @@
package io.kestra.core.models.executions;
/**
* Describe the kind of execution:
* - TEST: created by a test
* - NORMAL: anything else, for backward compatibility NORMAL is not persisted but null is used instead
*/
public enum ExecutionKind {
NORMAL, TEST
}

View File

@@ -6,8 +6,8 @@ import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.Value;
import org.slf4j.event.Level;
@@ -60,6 +60,9 @@ public class LogEntry implements DeletedInterface, TenantInterface {
@Builder.Default
boolean deleted = false;
@Nullable
ExecutionKind executionKind;
public static List<Level> findLevelsByMin(Level minLevel) {
if (minLevel == null) {
return Arrays.asList(Level.values());
@@ -76,10 +79,11 @@ public class LogEntry implements DeletedInterface, TenantInterface {
.namespace(execution.getNamespace())
.flowId(execution.getFlowId())
.executionId(execution.getId())
.executionKind(execution.getKind())
.build();
}
public static LogEntry of(TaskRun taskRun) {
public static LogEntry of(TaskRun taskRun, ExecutionKind executionKind) {
return LogEntry.builder()
.tenantId(taskRun.getTenantId())
.namespace(taskRun.getNamespace())
@@ -88,24 +92,27 @@ public class LogEntry implements DeletedInterface, TenantInterface {
.executionId(taskRun.getExecutionId())
.taskRunId(taskRun.getId())
.attemptNumber(taskRun.attemptNumber())
.executionKind(executionKind)
.build();
}
public static LogEntry of(Flow flow, AbstractTrigger abstractTrigger) {
public static LogEntry of(Flow flow, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
return LogEntry.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.triggerId(abstractTrigger.getId())
.executionId(abstractTrigger.getId())
.build();
}
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger) {
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
return LogEntry.builder()
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.triggerId(abstractTrigger.getId())
.executionId(abstractTrigger.getId())
.build();
}
@@ -122,7 +129,8 @@ public class LogEntry implements DeletedInterface, TenantInterface {
new AbstractMap.SimpleEntry<>("taskId", this.taskId),
new AbstractMap.SimpleEntry<>("executionId", this.executionId),
new AbstractMap.SimpleEntry<>("taskRunId", this.taskRunId),
new AbstractMap.SimpleEntry<>("triggerId", this.triggerId)
new AbstractMap.SimpleEntry<>("triggerId", this.triggerId),
new AbstractMap.SimpleEntry<>("executionKind", Optional.ofNullable(this.executionKind).map(executionKind -> executionKind.name()).orElse(null) )
)
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

View File

@@ -5,8 +5,8 @@ import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.Value;
@@ -57,7 +57,10 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
@Builder.Default
boolean deleted = false;
public static MetricEntry of(TaskRun taskRun, AbstractMetricEntry<?> metricEntry) {
@Nullable
ExecutionKind executionKind;
public static MetricEntry of(TaskRun taskRun, AbstractMetricEntry<?> metricEntry, ExecutionKind executionKind) {
return MetricEntry.builder()
.tenantId(taskRun.getTenantId())
.namespace(taskRun.getNamespace())
@@ -70,6 +73,7 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
.tags(metricEntry.getTags())
.value(computeValue(metricEntry))
.timestamp(metricEntry.getTimestamp())
.executionKind(executionKind)
.build();
}

View File

@@ -52,7 +52,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
*
* @param tenantId The tenant ID.
* @param namespace The namespace of execution.
* @param flowId The flow ID of execution.
* @param flowId The flow ID of execution.
* @param states The execution's states.
* @return an optional {@link Execution}.
*/
@@ -63,6 +63,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
@Nullable String tenantId,
@Nullable List<QueryFilter> filters
);
default Flux<Execution> find(
@Nullable String query,
@Nullable String tenantId,

View File

@@ -577,6 +577,7 @@ public class ExecutorService {
ResolvedTask.of(pause.getOnPause())
))
.task(pause.getOnPause())
.executionKind(executor.getExecution().getKind())
.build());
}
@@ -806,6 +807,7 @@ public class ExecutorService {
.runContext(runContext)
.taskRun(taskRun)
.task(task)
.executionKind(executor.getExecution().getKind())
.build();
// Get worker group
Optional<WorkerGroup> workerGroup = workerGroupService.resolveGroupFromJob(workerTask);

View File

@@ -106,7 +106,7 @@ public class RunContextFactory {
}
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(taskRun, task);
RunContextLogger runContextLogger = runContextLoggerFactory.create(taskRun, task, execution.getKind());
return newBuilder()
// Logger
@@ -129,7 +129,7 @@ public class RunContextFactory {
}
public RunContext of(Flow flow, AbstractTrigger trigger) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(flow, trigger);
RunContextLogger runContextLogger = runContextLoggerFactory.create(flow, trigger, null);
return newBuilder()
// Logger
.withLogger(runContextLogger)

View File

@@ -128,7 +128,7 @@ public class RunContextInitializer {
// rehydrate outputs
enrichedVariables.put("outputs", rehydrateOutputs((Map<String, Object>) enrichedVariables.get("outputs")));
final RunContextLogger runContextLogger = contextLoggerFactory.create(taskRun, task);
final RunContextLogger runContextLogger = contextLoggerFactory.create(workerTask);
enrichedVariables.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) runContextLogger::usedSecret);
enrichedVariables = variablesModifier.apply(enrichedVariables);
@@ -213,7 +213,7 @@ public class RunContextInitializer {
runContext.init(applicationContext);
final String triggerExecutionId = IdUtils.create();
final RunContextLogger runContextLogger = contextLoggerFactory.create(triggerContext, trigger);
final RunContextLogger runContextLogger = contextLoggerFactory.create(triggerContext, trigger, null);
final Map<String, Object> variables = new HashMap<>(runContext.getVariables());
variables.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) runContextLogger::usedSecret);

View File

@@ -88,6 +88,7 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
.flowId(logEntry.getFlowId())
.taskId(logEntry.getTaskId())
.executionId(logEntry.getExecutionId())
.executionKind(logEntry.getExecutionKind())
.taskRunId(logEntry.getTaskRunId())
.attemptNumber(logEntry.getAttemptNumber())
.triggerId(logEntry.getTriggerId())

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
@@ -23,10 +24,14 @@ public class RunContextLoggerFactory {
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
public RunContextLogger create(TaskRun taskRun, Task task) {
public RunContextLogger create(WorkerTask workerTask) {
return create(workerTask.getTaskRun(), workerTask.getTask(), workerTask.getExecutionKind());
}
public RunContextLogger create(TaskRun taskRun, Task task, ExecutionKind executionKind) {
return new RunContextLogger(
logQueue,
LogEntry.of(taskRun),
LogEntry.of(taskRun, executionKind),
task.getLogLevel(),
task.isLogToFile()
);
@@ -41,19 +46,19 @@ public class RunContextLoggerFactory {
);
}
public RunContextLogger create(TriggerContext triggerContext, AbstractTrigger trigger) {
public RunContextLogger create(TriggerContext triggerContext, AbstractTrigger trigger, ExecutionKind executionKind) {
return new RunContextLogger(
logQueue,
LogEntry.of(triggerContext, trigger),
LogEntry.of(triggerContext, trigger, executionKind),
trigger.getLogLevel(),
trigger.isLogToFile()
);
}
public RunContextLogger create(Flow flow, AbstractTrigger trigger) {
public RunContextLogger create(Flow flow, AbstractTrigger trigger, ExecutionKind executionKind) {
return new RunContextLogger(
logQueue,
LogEntry.of(flow, trigger),
LogEntry.of(flow, trigger, executionKind),
trigger.getLogLevel(),
trigger.isLogToFile()
);

View File

@@ -393,7 +393,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTaskResult = this.run(currentWorkerTask, false);
}
} catch (IllegalVariableEvaluationException e) {
RunContextLogger contextLogger = runContextLoggerFactory.create(currentWorkerTask.getTaskRun(), currentWorkerTask.getTask());
RunContextLogger contextLogger = runContextLoggerFactory.create(currentWorkerTask);
contextLogger.logger().error("Failed evaluating runIf: {}", e.getMessage(), e);
} catch (QueueException e) {
log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e);
@@ -696,7 +696,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask.getTaskRun(), workerTask.getTask());
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
try {
this.workerTaskResultQueue.emit(workerTaskResult);
@@ -817,7 +817,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
// metrics
runContext.metrics().forEach(metric -> {
try {
this.metricEntryQueue.emit(MetricEntry.of(workerTask.getTaskRun(), metric));
this.metricEntryQueue.emit(MetricEntry.of(workerTask.getTaskRun(), metric, workerTask.getExecutionKind()));
} catch (QueueException e) {
// fail silently
}

View File

@@ -1,9 +1,11 @@
package io.kestra.core.runners;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Task;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.Data;
import lombok.With;
@@ -29,6 +31,9 @@ public class WorkerTask extends WorkerJob {
@NotNull
private RunContext runContext;
@Nullable
private ExecutionKind executionKind;
/**
* {@inheritDoc}
*/

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.dashboards.AggregationType;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
@@ -139,6 +140,15 @@ public abstract class AbstractExecutionRepositoryTest {
i < 15 ? null : "second"
).trigger(executionTrigger).build());
}
// add a test execution, this should be ignored in search & statistics
executionRepository.save(builder(
State.Type.SUCCESS,
null
)
.trigger(executionTrigger)
.kind(ExecutionKind.TEST)
.build());
}
@Test
@@ -277,7 +287,7 @@ public abstract class AbstractExecutionRepositoryTest {
inject();
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), null, null);
assertThat(taskRuns.getTotal()).isEqualTo(71L);
assertThat(taskRuns.getTotal()).isEqualTo(74L);
assertThat(taskRuns.size()).isEqualTo(10);
var filters = List.of(QueryFilter.builder()
@@ -304,6 +314,18 @@ public abstract class AbstractExecutionRepositoryTest {
});
}
@Test
protected void shouldFindByIdTestExecution() {
executionRepository.save(ExecutionFixture.EXECUTION_TEST);
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_TEST.getId());
assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> {
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_TEST.getId());
});
}
@Test
protected void purge() {
executionRepository.save(ExecutionFixture.EXECUTION_1);
@@ -790,6 +812,6 @@ public abstract class AbstractExecutionRepositoryTest {
inject();
List<Execution> executions = executionRepository.findAllAsync(null).collectList().block();
assertThat(executions).hasSize(28);
assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.statistics.LogStatistics;
import io.kestra.core.utils.IdUtils;
@@ -48,6 +49,7 @@ public abstract class AbstractLogRepositoryTest {
LogEntry save = logRepository.save(builder.build());
logRepository.save(builder.executionKind(ExecutionKind.TEST).build()); // should only be loaded by execution id
find = logRepository.find(Pageable.UNPAGED, null, null);
assertThat(find.size()).isEqualTo(1);
@@ -74,31 +76,31 @@ public abstract class AbstractLogRepositoryTest {
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
List<LogEntry> list = logRepository.findByExecutionId(null, save.getExecutionId(), null);
assertThat(list.size()).isEqualTo(1);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionId(null, "io.kestra.unittest", "flowId", save.getExecutionId(), null);
assertThat(list.size()).isEqualTo(1);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskId(null, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isEqualTo(1);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskId(null, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isEqualTo(1);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskRunId(null, save.getExecutionId(), save.getTaskRunId(), null);
assertThat(list.size()).isEqualTo(1);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(null, save.getExecutionId(), save.getTaskRunId(), null, 0);
assertThat(list.size()).isEqualTo(1);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
Integer countDeleted = logRepository.purge(Execution.builder().id(save.getExecutionId()).build());
assertThat(countDeleted).isEqualTo(1);
assertThat(countDeleted).isEqualTo(2);
list = logRepository.findByExecutionIdAndTaskId(null, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isZero();
@@ -150,6 +152,15 @@ public abstract class AbstractLogRepositoryTest {
assertThat(find.size()).isZero();
}
@Test
void shouldFindByExecutionIdTestLogs() {
var builder = logEntry(Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build();
logRepository.save(builder);
List<LogEntry> logs = logRepository.findByExecutionId(null, builder.getExecutionId(), null);
assertThat(logs).hasSize(1);
}
@Test
void delete() {
LogEntry log1 = logEntry(Level.INFO).build();
@@ -195,6 +206,7 @@ public abstract class AbstractLogRepositoryTest {
.build()
);
}
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be ignored by stats
// mysql need some time ...
Thread.sleep(500);
@@ -215,6 +227,7 @@ public abstract class AbstractLogRepositoryTest {
logRepository.save(logEntry(Level.INFO).build());
logRepository.save(logEntry(Level.ERROR).build());
logRepository.save(logEntry(Level.WARN).build());
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should not be visible here
ZonedDateTime startDate = ZonedDateTime.now().minusSeconds(1);
@@ -238,11 +251,12 @@ public abstract class AbstractLogRepositoryTest {
@Test
void findAllAsync() {
logRepository.save(logEntry(Level.INFO).build());
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup
logRepository.save(logEntry(Level.ERROR).build());
logRepository.save(logEntry(Level.WARN).build());
Flux<LogEntry> find = logRepository.findAllAsync(null);
List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(3);
assertThat(logEntries).hasSize(4);
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.repositories;
import com.devskiller.friendly_id.FriendlyId;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.metrics.Counter;
@@ -26,20 +27,22 @@ public abstract class AbstractMetricRepositoryTest {
void all() {
String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"));
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST);
TaskRun taskRun2 = taskRun(executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer());
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
metricRepository.save(counter);
metricRepository.save(testCounter); // should only be retrieved by execution id
metricRepository.save(timer);
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(2);
assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(2);
assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(1);
assertThat(results.size()).isEqualTo(2);
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
null,
@@ -75,13 +78,16 @@ public abstract class AbstractMetricRepositoryTest {
void names() {
String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"));
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
TaskRun taskRun2 = taskRun(executionId, "task2");
MetricEntry counter2 = MetricEntry.of(taskRun2, counter("counter2"));
MetricEntry counter2 = MetricEntry.of(taskRun2, counter("counter2"), null);
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
metricRepository.save(counter);
metricRepository.save(counter2);
metricRepository.save(test); // should only be retrieved by execution id
List<String> flowMetricsNames = metricRepository.flowMetrics(null, "namespace", "flow");
@@ -97,14 +103,16 @@ public abstract class AbstractMetricRepositoryTest {
void findAllAsync() {
String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"));
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
TaskRun taskRun2 = taskRun(executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer());
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
metricRepository.save(counter);
metricRepository.save(timer);
metricRepository.save(test); // should be retrieved as findAllAsync is used for backup
List<MetricEntry> results = metricRepository.findAllAsync(null).collectList().block();
assertThat(results).hasSize(2);
assertThat(results).hasSize(3);
}
private Counter counter(String metricName) {

View File

@@ -1,10 +1,7 @@
package io.kestra.core.repositories;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils;
@@ -59,4 +56,29 @@ class ExecutionFixture {
.build()
))
.build();
public static final Execution EXECUTION_TEST = Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.flowRevision(1)
.state(new State())
.inputs(ImmutableMap.of("test", 1))
.kind(ExecutionKind.TEST)
.taskRunList(Collections.singletonList(
TaskRun.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.state(new State())
.attempts(Collections.singletonList(
TaskRunAttempt.builder()
.build()
))
.outputs(Variables.inMemory(ImmutableMap.of(
"out", 1
)))
.build()
))
.build();
}

View File

@@ -0,0 +1,3 @@
alter table executions add "kind" VARCHAR(32) GENERATED ALWAYS AS (JQ_STRING("value", '.kind'));
alter table logs add "execution_kind" VARCHAR(32) GENERATED ALWAYS AS (JQ_STRING("value", '.executionKind'));
alter table metrics add "execution_kind" VARCHAR(32) GENERATED ALWAYS AS (JQ_STRING("value", '.executionKind'));

View File

@@ -0,0 +1,3 @@
alter table executions add `kind` VARCHAR(32) GENERATED ALWAYS AS (value ->> '$.kind') STORED;
alter table logs add `execution_kind` VARCHAR(32) GENERATED ALWAYS AS (value ->> '$.executionKind') STORED;
alter table metrics add `execution_kind` VARCHAR(32) GENERATED ALWAYS AS (value ->> '$.executionKind') STORED;

View File

@@ -0,0 +1,3 @@
alter table executions add "kind" VARCHAR(32) GENERATED ALWAYS AS (value ->> 'kind') STORED;
alter table logs add "execution_kind" VARCHAR(32) GENERATED ALWAYS AS (value ->> 'executionKind') STORED;
alter table metrics add "execution_kind" VARCHAR(32) GENERATED ALWAYS AS (value ->> 'executionKind') STORED;

View File

@@ -55,6 +55,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
private static final Field<Object> START_DATE_FIELD = field("start_date");
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull();
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
@@ -286,7 +287,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
)
.hint(context.configuration().dialect().supports(SQLDialect.MYSQL) ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, false));
.where(this.defaultFilter(tenantId, false))
.and(NORMAL_KIND_CONDITION);
if (filters != null)
for (QueryFilter filter : filters) {
@@ -594,6 +596,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
.select(selectFields)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(NORMAL_KIND_CONDITION)
.and(START_DATE_FIELD.greaterOrEqual(startDate.toOffsetDateTime()))
.and(START_DATE_FIELD.lessOrEqual(endDate.toOffsetDateTime()));
@@ -694,6 +697,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
.select(NAMESPACE_FIELD, STATE_CURRENT_FIELD, DSL.count().cast(Long.class))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION)
.and(START_DATE_FIELD.greaterOrEqual(finalStartDate.toOffsetDateTime()))
.and(START_DATE_FIELD.lessOrEqual(finalEndDate.toOffsetDateTime()));
@@ -905,7 +909,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
DSL.count().as("count")
))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
select = select.and(START_DATE_FIELD.greaterOrEqual(finalStartDate.toOffsetDateTime()));
select = select.and(START_DATE_FIELD.lessOrEqual(finalEndDate.toOffsetDateTime()));
@@ -1009,6 +1014,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION)
.and(field("end_date").isNotNull())
.and(DSL.or(
flows

View File

@@ -34,7 +34,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,
@@ -96,6 +96,7 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
.hint(context.configuration().dialect().supports(SQLDialect.MYSQL) ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION)
.and(this.findCondition(query));
select = this.filter(select, filters, "timestamp");
@@ -173,7 +174,8 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
.select(field("value"))
.hint(context.configuration().dialect().supports(SQLDialect.MYSQL) ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
addNamespace(select, namespace);
addMinLevel(select, minLevel);
select = select.and(field("timestamp").greaterThan(startDate.toOffsetDateTime()));
@@ -243,7 +245,8 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
SelectConditionStep<Record> select = context
.select(selectFields)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
this.filter(select, query, namespace, flowId, null, minLevel, startDate, endDate);

View File

@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;
public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository,
@@ -126,7 +127,8 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
return this.queryDistinct(
tenantId,
field("flow_id").eq(flowId)
.and(field("namespace").eq(namespace)),
.and(field("namespace").eq(namespace))
.and(NORMAL_KIND_CONDITION),
"metric_name"
);
}
@@ -142,7 +144,8 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
tenantId,
field("flow_id").eq(flowId)
.and(field("namespace").eq(namespace))
.and(field("task_id").eq(taskId)),
.and(field("task_id").eq(taskId))
.and(NORMAL_KIND_CONDITION),
"metric_name"
);
}
@@ -156,7 +159,8 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
return this.queryDistinct(
tenantId,
field("flow_id").eq(flowId)
.and(field("namespace").eq(namespace)),
.and(field("namespace").eq(namespace))
.and(NORMAL_KIND_CONDITION),
"task_id"
);
}
@@ -174,7 +178,8 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
) {
Condition conditions = field("flow_id").eq(flowId)
.and(field("namespace").eq(namespace))
.and(field("metric_name").eq(metric));
.and(field("metric_name").eq(metric))
.and(NORMAL_KIND_CONDITION);
if (taskId != null) {
conditions = conditions.and(field("task_id").eq(taskId));
}

View File

@@ -630,7 +630,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
JdbcExecutor.log.info(log);
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun()).toBuilder()
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
.level(Level.INFO)
.message(log)
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())

View File

@@ -689,8 +689,8 @@ public class ExecutionController {
private final URI url;
// This is not nice, but we cannot use @AllArgsConstructor as it would open a bunch of necessary changes on the Execution class.
ExecutionResponse(String tenantId, String id, String namespace, String flowId, Integer flowRevision, List<TaskRun> taskRunList, Map<String, Object> inputs, Map<String, Object> outputs, List<Label> labels, Map<String, Object> variables, State state, String parentId, String originalId, ExecutionTrigger trigger, boolean deleted, ExecutionMetadata metadata, Instant scheduleDate, String traceParent, List<TaskFixture> fixtures, URI url) {
super(tenantId, id, namespace, flowId, flowRevision, taskRunList, inputs, outputs, labels, variables, state, parentId, originalId, trigger, deleted, metadata, scheduleDate, traceParent, fixtures);
ExecutionResponse(String tenantId, String id, String namespace, String flowId, Integer flowRevision, List<TaskRun> taskRunList, Map<String, Object> inputs, Map<String, Object> outputs, List<Label> labels, Map<String, Object> variables, State state, String parentId, String originalId, ExecutionTrigger trigger, boolean deleted, ExecutionMetadata metadata, Instant scheduleDate, String traceParent, List<TaskFixture> fixtures, ExecutionKind kind, URI url) {
super(tenantId, id, namespace, flowId, flowRevision, taskRunList, inputs, outputs, labels, variables, state, parentId, originalId, trigger, deleted, metadata, scheduleDate, traceParent, fixtures, kind);
this.url = url;
}
@@ -716,6 +716,7 @@ public class ExecutionController {
execution.getScheduleDate(),
execution.getTraceParent(),
execution.getFixtures(),
execution.getKind(),
url
);
}