feat(system): add distinct server-events for reporting

Refactor the services used to generate periodic reports on server usage.

Related-to: kestra-io/kestra-ee#3014
This commit is contained in:
Florian Hussonnois
2025-08-13 19:23:00 +02:00
committed by Florian Hussonnois
parent ab9951466d
commit 3929bf6172
34 changed files with 1211 additions and 500 deletions

View File

@@ -212,7 +212,7 @@ kestra:
retention: 30d
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/usages
uri: https://api.kestra.io/v1/server-events/
initial-delay: 5m
fixed-delay: 1h

View File

@@ -62,6 +62,7 @@ public record ServiceUsage(
List<DailyServiceStatistics> statistics = Arrays
.stream(ServiceType.values())
.filter(it -> !it.equals(ServiceType.INVALID))
.map(type -> of(from, to, repository, type, interval))
.toList();
return new ServiceUsage(statistics);

View File

@@ -1,74 +0,0 @@
package io.kestra.core.models.collectors;
import io.kestra.core.models.ServerType;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized
@Introspected
@AllArgsConstructor
public class Usage {
@NotNull
private final String uuid;
@NotNull
private final String startUuid;
@NotNull
private final String instanceUuid;
@NotNull
private final ServerType serverType;
@NotNull
private final String version;
@NotNull
private final ZoneId zoneId;
@Nullable
private final String uri;
@Nullable
private final Set<String> environments;
@NotNull
private final Instant startTime;
@Valid
private final HostUsage host;
@Valid
private final ConfigurationUsage configurations;
@Valid
private final List<PluginUsage> plugins;
@Valid
private final FlowUsage flows;
@Valid
private final ExecutionUsage executions;
@Valid
@Nullable
private ServiceUsage services;
@Valid
@Nullable
private List<PluginMetric> pluginMetrics;
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.reporter;
public abstract class AbstractReportable<T extends Reportable.Event> implements Reportable<T> {
private final Type type;
private final ReportingSchedule schedule;
private final boolean isTenantSupported;
public AbstractReportable(Type type, ReportingSchedule schedule, boolean isTenantSupported) {
this.type = type;
this.schedule = schedule;
this.isTenantSupported = isTenantSupported;
}
@Override
public boolean isTenantSupported() {
return isTenantSupported;
}
@Override
public Type type() {
return type;
}
@Override
public ReportingSchedule schedule() {
return schedule;
}
}

View File

@@ -0,0 +1,94 @@
package io.kestra.core.reporter;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* Interface for reporting server event for a specific type.
*
* @param <T>
*/
public interface Reportable<T extends Reportable.Event> {
/**
* Gets the type of the event to report.
*/
Type type();
/**
* Gets the reporting schedule.
*/
ReportingSchedule schedule();
/**
* Generates a report for the given timestamp.
*
* @param now the time when the report is triggered.
* @return an Optional containing the report data if available.
*/
T report(Instant now, TimeInterval interval);
default T report(Instant now) {
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
ZonedDateTime from = to.minus(Duration.ofDays(1));
return report(now, new TimeInterval(from, to));
}
/**
* Checks whether this reportable is enabled for scheduled reporting.
*/
boolean isEnabled();
/**
* Generates a report for the given timestamp and tenant.
*
* @param now the time when the report is triggered.
* @param tenant the tenant for which the report is triggered.
* @return the event to report.
*/
default T report(Instant now, TimeInterval interval, String tenant) {
throw new UnsupportedOperationException();
}
default T report(Instant now, String tenant) {
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
ZonedDateTime from = to.minus(Duration.ofDays(1));
return report(now, new TimeInterval(from, to), tenant);
}
/**
* Checks whether this {@link Reportable} can accept a tenant.
*
* @return {@code true} a {@link #report(Instant, TimeInterval, String)} can called, Otherwise {@code false}.
*/
default boolean isTenantSupported() {
return false;
}
record TimeInterval(ZonedDateTime from, ZonedDateTime to){
public static TimeInterval of(ZonedDateTime from, ZonedDateTime to) {
return new TimeInterval(from, to);
}
}
/**
* Marker interface indicating that the returned event
* must be a structured, domain-specific object
* (not a primitive wrapper, String, collection, or other basic type).
*/
interface Event {
}
/**
* Defines the schedule for a report.
*/
interface ReportingSchedule {
/**
* Determines whether a report should run at the given instant.
*/
boolean shouldRun(Instant now);
}
}

View File

@@ -0,0 +1,40 @@
package io.kestra.core.reporter;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Singleton
@Slf4j
public class ReportableRegistry {
private final Map<Type, Reportable<?>> reportables = new ConcurrentHashMap<>();
/**
* Creates a new {@link ReportableRegistry} instance.
*
* @param reportables The {@link Reportable reportables}
*/
@Inject
public ReportableRegistry(final List<Reportable<?>> reportables) {
reportables.forEach(reportable -> this.reportables.put(reportable.type(), reportable));
}
public void register(final Reportable<?> reportable) {
Objects.requireNonNull(reportable, "reportable must not be null");
if (reportables.containsKey(reportable.type())) {
log.warn("Event already registered for type '{}'", reportable.type());
} else {
reportables.put(reportable.type(), reportable);
}
}
public List<Reportable<?>> getAll() {
return List.copyOf(reportables.values());
}
}

View File

@@ -0,0 +1,43 @@
package io.kestra.core.reporter;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Clock;
import java.time.Instant;
@Singleton
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
@Requires(property = "kestra.server-type")
@Slf4j
public class ReportableScheduler {
private final ReportableRegistry registry;
private final ServerEventSender sender;
private final Clock clock;
@Inject
public ReportableScheduler(ReportableRegistry registry, ServerEventSender sender) {
this.registry = registry;
this.sender = sender;
this.clock = Clock.systemDefaultZone();
}
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay}")
public void tick() {
Instant now = clock.instant();
for (Reportable<?> r : registry.getAll()) {
if (r.isEnabled() && r.schedule().shouldRun(now)) {
try {
Object value = r.report(now);
if (value != null) sender.send(now, r.type(), value);
} catch (Exception e) {
log.debug("Failed to send report for event-type '{}'", r.type(), e);
}
}
}
}
}

View File

@@ -0,0 +1,57 @@
package io.kestra.core.reporter;
import io.kestra.core.reporter.Reportable.ReportingSchedule;
import java.time.Duration;
import java.time.Instant;
/**
* Utility class providing common implementations of {@link Reportable.ReportingSchedule}.
*/
public class Schedules {
/**
* Creates a reporting schedule that triggers after the specified period has elapsed
* since the last execution.
*
* @param period the duration between successive runs; must be positive
* @return a {@link Reportable.ReportingSchedule} that runs at the given interval
* @throws IllegalArgumentException if {@code period} is zero or negative
*/
public static ReportingSchedule every(final Duration period) {
if (period.isZero() || period.isNegative()) {
throw new IllegalArgumentException("Period must be positive");
}
return new ReportingSchedule() {
private Instant lastRun = Instant.EPOCH;
@Override
public boolean shouldRun(Instant now) {
if (Duration.between(lastRun, now).compareTo(period) >= 0) {
lastRun = now;
return true;
}
return false;
}
};
}
/**
* Creates a reporting schedule that triggers once every hour.
*
* @return a schedule running every 1 hour
*/
public static ReportingSchedule hourly() {
return every(Duration.ofHours(1));
}
/**
* Creates a reporting schedule that triggers once every day.
*
* @return a schedule running every 24 hours
*/
public static ReportingSchedule daily() {
return every(Duration.ofDays(1));
}
}

View File

@@ -0,0 +1,31 @@
package io.kestra.core.reporter;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
import io.kestra.core.models.ServerType;
import lombok.Builder;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* Represents a Kestra Server Event.
*/
@Builder(toBuilder = true)
public record ServerEvent(
String instanceUuid,
String sessionUuid,
ServerType serverType,
String serverVersion,
ZoneId zoneId,
Object payload,
String uuid,
ZonedDateTime reportedAt
) {
@JsonUnwrapped
public Object payload() {
return payload;
}
}

View File

@@ -0,0 +1,91 @@
package io.kestra.core.reporter;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.Result;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.InstanceService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.VersionProvider;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.time.Instant;
import java.time.ZoneId;
import java.util.UUID;
@Singleton
@Slf4j
public class ServerEventSender {
private static final String SESSION_UUID = IdUtils.create();
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
@Inject
@Client
private ReactorHttpClient client;
@Inject
private VersionProvider versionProvider;
@Inject
private InstanceService instanceService;
private final ServerType serverType;
@Value("${kestra.anonymous-usage-report.uri}")
protected URI url;
public ServerEventSender( ) {
this.serverType = KestraContext.getContext().getServerType();
}
public void send(final Instant now, final Type type, Object event) {
ServerEvent serverEvent = ServerEvent
.builder()
.uuid(UUID.randomUUID().toString())
.sessionUuid(SESSION_UUID)
.instanceUuid(instanceService.fetch())
.serverType(serverType)
.serverVersion(versionProvider.getVersion())
.reportedAt(now.atZone(ZoneId.systemDefault()))
.payload(event)
.zoneId(ZoneId.systemDefault())
.build();
try {
MutableHttpRequest<ServerEvent> request = this.request(serverEvent, type);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", OBJECT_MAPPER.writeValueAsString(serverEvent));
}
this.handleResponse(client.toBlocking().retrieve(request, Argument.of(Result.class), Argument.of(JsonError.class)));
} catch (HttpClientResponseException t) {
log.trace("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
} catch (Exception t) {
log.trace("Unable to handle anonymous usage", t);
}
}
private void handleResponse (Result result){
}
protected MutableHttpRequest<ServerEvent> request(ServerEvent event, Type type) throws Exception {
URI baseUri = URI.create(this.url.toString().endsWith("/") ? this.url.toString() : this.url + "/");
URI resolvedUri = baseUri.resolve(type.name().toLowerCase());
return HttpRequest.POST(resolvedUri, event)
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
}
}

View File

@@ -0,0 +1,9 @@
package io.kestra.core.reporter;
/**
* A reportable event type.
*/
public interface Type {
String name();
}

View File

@@ -0,0 +1,12 @@
package io.kestra.core.reporter;
/**
* All supported reportable event type.
*/
public enum Types implements Type {
USAGE,
SYSTEM_INFORMATION,
PLUGIN_METRICS,
SERVICE_USAGE,
PLUGIN_USAGE;
}

View File

@@ -0,0 +1,73 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.ExecutionUsage;
import io.kestra.core.models.collectors.FlowUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.time.Instant;
import java.util.Objects;
@Singleton
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
private final FlowRepositoryInterface flowRepository;
private final ExecutionRepositoryInterface executionRepository;
private final boolean enabled;
@Inject
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
ExecutionRepositoryInterface executionRepository) {
super(Types.USAGE, Schedules.hourly(), true);
this.flowRepository = flowRepository;
this.executionRepository = executionRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public UsageEvent report(final Instant now, TimeInterval interval) {
return UsageEvent
.builder()
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
Objects.requireNonNull(tenant, "tenant is null");
Objects.requireNonNull(interval, "interval is null");
return UsageEvent
.builder()
.flows(FlowUsage.of(tenant, flowRepository))
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
.build();
}
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized
@Introspected
public static class UsageEvent implements Event {
private ExecutionUsage executions;
private FlowUsage flows;
}
}

View File

@@ -0,0 +1,105 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.PluginMetric;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.utils.ListUtils;
import io.micrometer.core.instrument.Timer;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
public class PluginMetricReport extends AbstractReportable<PluginMetricReport.PluginMetricEvent> {
private final PluginRegistry pluginRegistry;
private final MetricRegistry metricRegistry;
private final boolean enabled;
@Inject
public PluginMetricReport(PluginRegistry pluginRegistry,
MetricRegistry metricRegistry) {
super(Types.PLUGIN_METRICS, Schedules.daily(), false);
this.metricRegistry = metricRegistry;
this.pluginRegistry = pluginRegistry;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.SCHEDULER.equals(serverType) || ServerType.WORKER.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public PluginMetricEvent report(final Instant now, final TimeInterval period) {
return PluginMetricEvent
.builder()
.pluginMetrics(pluginMetrics())
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Builder
@Introspected
public record PluginMetricEvent (
List<PluginMetric> pluginMetrics
) implements Event {
}
private List<PluginMetric> pluginMetrics() {
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.map(Class::getName)
.map(this::taskMetric)
.flatMap(Optional::stream)
.toList();
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.map(Class::getName)
.map(this::triggerMetric)
.flatMap(Optional::stream)
.toList();
return ListUtils.concat(taskMetrics, triggerMetrics);
}
private Optional<PluginMetric> taskMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
return fromTimer(type, duration);
}
private Optional<PluginMetric> triggerMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
if (duration == null) {
// this may be because this is a trigger executed by the scheduler, we search there instead
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
}
return fromTimer(type, duration);
}
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
if (timer == null || timer.count() == 0) {
return Optional.empty();
}
double count = timer.count();
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
}
}

View File

@@ -0,0 +1,51 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.PluginUsage;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Instant;
import java.util.List;
@Singleton
public class PluginUsageReport extends AbstractReportable<PluginUsageReport.PluginUsageEvent> {
private final PluginRegistry pluginRegistry;
private final boolean enabled;
@Inject
public PluginUsageReport(PluginRegistry pluginRegistry) {
super(Types.PLUGIN_USAGE, Schedules.daily(), false);
this.pluginRegistry = pluginRegistry;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public PluginUsageEvent report(final Instant now, final TimeInterval period) {
return PluginUsageEvent
.builder()
.plugins(PluginUsage.of(pluginRegistry))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Builder
@Introspected
public record PluginUsageEvent(
List<PluginUsage> plugins
) implements Event {
}
}

View File

@@ -0,0 +1,53 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.ServiceUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Duration;
import java.time.Instant;
@Singleton
public class ServiceUsageReport extends AbstractReportable<ServiceUsageReport.ServiceUsageEvent> {
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
private final boolean isEnabled;
@Inject
public ServiceUsageReport(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
super(Types.SERVICE_USAGE, Schedules.daily(), false);
this.serviceInstanceRepository = serviceInstanceRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.isEnabled = ServerType.STANDALONE.equals(serverType) || ServerType.EXECUTOR.equals(serverType);
}
@Override
public ServiceUsageEvent report(final Instant now, final TimeInterval period) {
return ServiceUsageEvent
.builder()
.services(ServiceUsage.of(period.from().toInstant(), period.to().toInstant(), serviceInstanceRepository, Duration.ofMinutes(5)))
.build();
}
@Override
public boolean isEnabled() {
return isEnabled;
}
@Builder
@Introspected
public record ServiceUsageEvent(
ServiceUsage services
) implements Event {
}
}

View File

@@ -0,0 +1,63 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.models.collectors.ConfigurationUsage;
import io.kestra.core.models.collectors.HostUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.lang.management.ManagementFactory;
import java.time.Instant;
import java.util.Set;
@Singleton
public class SystemInformationReport extends AbstractReportable<SystemInformationReport.SystemInformationEvent> {
private final Environment environment;
private final ApplicationContext applicationContext;
private final String kestraUrl;
private final Instant startTime;
@Inject
public SystemInformationReport(ApplicationContext applicationContext) {
super(Types.SYSTEM_INFORMATION, Schedules.daily(), false);
this.environment = applicationContext.getEnvironment();
this.applicationContext = applicationContext;
this.kestraUrl = applicationContext.getProperty("kestra.url", String.class).orElse(null);
this.startTime = Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime());
}
@Override
public SystemInformationEvent report(final Instant now, final TimeInterval timeInterval) {
return SystemInformationEvent
.builder()
.environments(environment.getActiveNames())
.configurations(ConfigurationUsage.of(applicationContext))
.startTime(startTime)
.host(HostUsage.of())
.uri(kestraUrl)
.build();
}
@Override
public boolean isEnabled() {
return true;
}
@Builder
@Introspected
public record SystemInformationEvent(
Set<String> environments,
HostUsage host,
ConfigurationUsage configurations,
Instant startTime,
String uri
) implements Event {
}
}

View File

@@ -1,22 +0,0 @@
package io.kestra.core.services;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import lombok.extern.slf4j.Slf4j;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@Slf4j
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
@Requires(property = "kestra.server-type")
public class CollectorScheduler {
@Inject
protected CollectorService collectorService;
@Scheduled(initialDelay = "${kestra.anonymous-usage-report.initial-delay}", fixedDelay = "${kestra.anonymous-usage-report.fixed-delay}")
public void report() {
collectorService.report();
}
}

View File

@@ -1,220 +0,0 @@
package io.kestra.core.services;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.*;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.VersionProvider;
import io.micrometer.core.instrument.Timer;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
@Slf4j
public class CollectorService {
protected static final String UUID = IdUtils.create();
@Inject
@Client
protected ReactorHttpClient client;
@Inject
protected ApplicationContext applicationContext;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
protected InstanceService instanceService;
@Inject
protected VersionProvider versionProvider;
@Inject
protected PluginRegistry pluginRegistry;
@Nullable
@Value("${kestra.server-type}")
protected ServerType serverType;
@Nullable
@Value("${kestra.url:}")
protected String kestraUrl;
@Value("${kestra.anonymous-usage-report.uri}")
protected URI url;
@Inject
private ServiceInstanceRepositoryInterface serviceRepository;
@Inject
private MetricRegistry metricRegistry;
private transient Usage defaultUsage;
protected synchronized Usage defaultUsage() {
boolean first = defaultUsage == null;
if (first) {
defaultUsage = Usage.builder()
.startUuid(UUID)
.instanceUuid(instanceService.fetch())
.serverType(serverType)
.version(versionProvider.getVersion())
.zoneId(ZoneId.systemDefault())
.uri(kestraUrl == null ? null : kestraUrl)
.environments(applicationContext.getEnvironment().getActiveNames())
.startTime(Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime()))
.host(HostUsage.of())
.configurations(ConfigurationUsage.of(applicationContext))
.plugins(PluginUsage.of(pluginRegistry))
.build();
}
return defaultUsage;
}
public Usage metrics(boolean details) {
return metrics(details, serverType == ServerType.WORKER || serverType == ServerType.SCHEDULER || serverType == ServerType.STANDALONE);
}
public Usage metrics(boolean details, boolean metrics) {
ZonedDateTime to = ZonedDateTime.now();
ZonedDateTime from = to
.toLocalDate()
.atStartOfDay(ZoneId.systemDefault())
.minusDays(1);
return metrics(details, metrics, from, to);
}
public Usage metrics(boolean details, boolean metrics, ZonedDateTime from, ZonedDateTime to) {
Usage.UsageBuilder<?, ?> builder = defaultUsage()
.toBuilder()
.uuid(IdUtils.create());
if (details) {
builder = builder
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, from, to))
.services(ServiceUsage.of(from.toInstant(), to.toInstant(), serviceRepository, Duration.ofMinutes(5)));
}
if (metrics) {
builder = builder.pluginMetrics(pluginMetrics());
}
return builder.build();
}
public void report() {
try {
Usage metrics = this.metrics(serverType == ServerType.EXECUTOR || serverType == ServerType.STANDALONE);
MutableHttpRequest<Usage> post = this.request(metrics);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", JacksonMapper.ofJson().writeValueAsString(metrics));
}
Result result = client.toBlocking()
.retrieve(
post,
Argument.of(Result.class),
Argument.of(JsonError.class)
);
this.handleResponse(result);
} catch (HttpClientResponseException t) {
log.debug("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
} catch (Exception t) {
log.debug("Unable to handle anonymous usage", t);
}
}
private void handleResponse(Result result) {
}
protected MutableHttpRequest<Usage> request(Usage metrics) throws Exception {
return HttpRequest.POST(this.url, metrics)
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
}
private List<PluginMetric> pluginMetrics() {
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.map(cls -> cls.getName())
.map(type -> taskMetric(type))
.filter(opt -> opt.isPresent())
.map(opt -> opt.get())
.toList();
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.map(cls -> cls.getName())
.map(type -> triggerMetric(type))
.filter(opt -> opt.isPresent())
.map(opt -> opt.get())
.toList();
return ListUtils.concat(taskMetrics, triggerMetrics);
}
private Optional<PluginMetric> taskMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
return fromTimer(type, duration);
}
private Optional<PluginMetric> triggerMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
if (duration == null) {
// this may be because this is a trigger executed by the scheduler, we search there instead
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
}
return fromTimer(type, duration);
}
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
if (timer == null || timer.count() == 0) {
return Optional.empty();
}
double count = timer.count();
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
}
}

View File

@@ -10,22 +10,34 @@ import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class InstanceService {
private final SettingRepositoryInterface settingRepository;
@Inject
private SettingRepositoryInterface settingRepository;
private Setting instanceIdSetting;
public InstanceService(SettingRepositoryInterface settingRepository) {
this.settingRepository = settingRepository;
}
private volatile Setting instanceIdSetting;
public String fetch() {
if (this.instanceIdSetting == null) {
instanceIdSetting = settingRepository
.findByKey(Setting.INSTANCE_UUID)
.orElseGet(() -> settingRepository.save(Setting.builder()
.key(Setting.INSTANCE_UUID)
.value(IdUtils.create())
.build()
));
synchronized (this) {
if (this.instanceIdSetting == null) {
instanceIdSetting = fetchInstanceUuid();
}
}
}
return this.instanceIdSetting.getValue().toString();
}
private Setting fetchInstanceUuid() {
return settingRepository
.findByKey(Setting.INSTANCE_UUID)
.orElseGet(() -> settingRepository.save(Setting.builder()
.key(Setting.INSTANCE_UUID)
.value(IdUtils.create())
.build()
));
}
}

View File

@@ -0,0 +1,86 @@
package io.kestra.core.reporter;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
class SchedulesTest {
@Test
void shouldTriggerAfterPeriodGivenEnoughTimeHasPassed() {
// Given
var schedule = Schedules.every(Duration.ofHours(1));
Instant now = Instant.now();
// When
boolean firstRun = schedule.shouldRun(now);
boolean fiveMinutesLater = schedule.shouldRun(now.plus(Duration.ofMinutes(5)));
boolean oneHourLater = schedule.shouldRun(now.plus(Duration.ofHours(1)));
// Then
assertThat(firstRun).isTrue();
assertThat(fiveMinutesLater).isFalse();
assertThat(oneHourLater).isTrue();
}
@Test
void shouldNotTriggerGivenPeriodHasNotElapsed() {
// Given
var schedule = Schedules.every(Duration.ofMinutes(30));
Instant now = Instant.now();
// When
boolean firstRun = schedule.shouldRun(now);
boolean almost30Minutes = schedule.shouldRun(now.plus(Duration.ofMinutes(29)));
// Then
assertThat(firstRun).isTrue();
assertThat(almost30Minutes).isFalse();
}
@Test
void shouldTriggerHourlyGivenOneHourHasElapsed() {
// Given
var schedule = Schedules.hourly();
Instant now = Instant.now();
// When
boolean firstRun = schedule.shouldRun(now);
boolean nextHour = schedule.shouldRun(now.plus(Duration.ofHours(1)));
// Then
assertThat(firstRun).isTrue();
assertThat(nextHour).isTrue();
}
@Test
void shouldTriggerDailyGivenOneDayHasElapsed() {
// Given
var schedule = Schedules.daily();
Instant now = Instant.now();
// When
boolean firstRun = schedule.shouldRun(now);
boolean nextDay = schedule.shouldRun(now.plus(Duration.ofDays(1)));
// Then
assertThat(firstRun).isTrue();
assertThat(nextDay).isTrue();
}
@Test
void shouldThrowExceptionGivenZeroOrNegativeDuration() {
// Given / When / Then
assertThatThrownBy(() -> Schedules.every(Duration.ZERO))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Period must be positive");
assertThatThrownBy(() -> Schedules.every(Duration.ofSeconds(-5)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Period must be positive");
}
}

View File

@@ -0,0 +1,33 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.reporter.Reportable;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
public abstract class AbstractFeatureUsageReportTest {
@Inject
FeatureUsageReport featureUsageReport;
@Test
public void shouldGetReport() {
// When
Instant now = Instant.now();
FeatureUsageReport.UsageEvent event = featureUsageReport.report(
now,
Reportable.TimeInterval.of(now.minus(Duration.ofDays(1)).atZone(ZoneId.systemDefault()), now.atZone(ZoneId.systemDefault()))
);
// Then
assertThat(event.getExecutions().getDailyExecutionsCount().size()).isGreaterThan(0);
assertThat(event.getExecutions().getDailyTaskRunsCount()).isNull();
}
}

View File

@@ -0,0 +1,81 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.collectors.ServiceUsage;
import io.kestra.core.reporter.Reportable;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceType;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Set;
@KestraTest
public abstract class AbstractServiceUsageReportTest {
@Inject
ServiceUsageReport serviceUsageReport;
@Inject
ServiceInstanceRepositoryInterface serviceInstanceRepository;
@Test
public void shouldGetReport() {
// Given
final LocalDate start = LocalDate.now().withDayOfMonth(1);
final LocalDate end = start.withDayOfMonth(start.getMonth().length(start.isLeapYear()));
final ZoneId zoneId = ZoneId.systemDefault();
LocalDate from = start;
int days = 0;
// generate one month of service instance
while (from.toEpochDay() < end.toEpochDay()) {
Instant createAt = from.atStartOfDay(zoneId).toInstant();
Instant updatedAt = from.atStartOfDay(zoneId).plus(Duration.ofHours(10)).toInstant();
ServiceInstance instance = new ServiceInstance(
IdUtils.create(),
ServiceType.EXECUTOR,
Service.ServiceState.EMPTY,
null,
createAt,
updatedAt,
List.of(),
null,
Map.of(),
Set.of()
);
instance = instance
.state(Service.ServiceState.RUNNING, createAt)
.state(Service.ServiceState.NOT_RUNNING, updatedAt);
serviceInstanceRepository.save(instance);
from = from.plusDays(1);
days++;
}
// When
Instant now = end.plusDays(1).atStartOfDay(zoneId).toInstant();
ServiceUsageReport.ServiceUsageEvent event = serviceUsageReport.report(now,
Reportable.TimeInterval.of(start.atStartOfDay(zoneId), end.plusDays(1).atStartOfDay(zoneId))
);
// Then
List<ServiceUsage.DailyServiceStatistics> statistics = event.services().dailyStatistics();
Assertions.assertEquals(ServiceType.values().length - 1, statistics.size());
Assertions.assertEquals(
days,
statistics.stream().filter(it -> it.type().equalsIgnoreCase("EXECUTOR")).findFirst().get().values().size()
);
}
}

View File

@@ -0,0 +1,41 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.plugin.core.http.Trigger;
import io.kestra.plugin.core.log.Log;
import io.kestra.plugin.core.trigger.Schedule;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class PluginMetricReportTest {
@Inject
MetricRegistry metricRegistry;
@Inject
PluginMetricReport pluginMetricReport;
@Test
void shouldGetReport() {
// Given
metricRegistry.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, MetricRegistry.TAG_TASK_TYPE, Log.class.getName())
.record(() -> Duration.ofSeconds(1));
metricRegistry.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Trigger.class.getName())
.record(() -> Duration.ofSeconds(1));
metricRegistry.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Schedule.class.getName())
.record(() -> Duration.ofSeconds(1));
// When
PluginMetricReport.PluginMetricEvent event = pluginMetricReport.report(Instant.now());
// Then
assertThat(event.pluginMetrics()).hasSize(3);
}
}

View File

@@ -0,0 +1,68 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class SystemInformationReportTest {
@Inject
private SystemInformationReport systemInformationReport;
@Test
void shouldGetReport() {
SystemInformationReport.SystemInformationEvent event = systemInformationReport.report(Instant.now());
assertThat(event.uri()).isEqualTo("https://mysuperhost.com/subpath");
assertThat(event.environments()).contains("test");
assertThat(event.startTime()).isNotNull();
assertThat(event.host().getUuid()).isNotNull();
assertThat(event.host().getHardware().getLogicalProcessorCount()).isNotNull();
assertThat(event.host().getJvm().getName()).isNotNull();
assertThat(event.host().getOs().getFamily()).isNotNull();
assertThat(event.configurations().getRepositoryType()).isEqualTo("memory");
assertThat(event.configurations().getQueueType()).isEqualTo("memory");
}
@MockBean(SettingRepositoryInterface.class)
@Singleton
static class TestSettingRepository implements SettingRepositoryInterface {
public static Object UUID = null;
@Override
public Optional<Setting> findByKey(String key) {
return Optional.empty();
}
@Override
public List<Setting> findAll() {
return new ArrayList<>();
}
@Override
public Setting save(Setting setting) throws ConstraintViolationException {
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
UUID = setting.getValue();
}
return setting;
}
@Override
public Setting delete(Setting setting) {
return setting;
}
}
}

View File

@@ -1,106 +0,0 @@
package io.kestra.core.services;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.Helpers;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.Setting;
import io.kestra.core.models.collectors.Usage;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.kestra.plugin.core.http.Trigger;
import io.kestra.plugin.core.log.Log;
import io.kestra.plugin.core.trigger.Schedule;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.Test;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import jakarta.validation.ConstraintViolationException;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class CollectorServiceTest {
@Test
public void metrics() throws URISyntaxException {
ImmutableMap<String, Object> properties = ImmutableMap.of("kestra.server-type", ServerType.STANDALONE.name());
try (ApplicationContext applicationContext = Helpers.applicationContext(properties).start()) {
MetricRegistry metricRegistry = applicationContext.getBean(MetricRegistry.class);
// inject fake metrics to have plugin metrics
metricRegistry.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, MetricRegistry.TAG_TASK_TYPE, Log.class.getName())
.record(() -> Duration.ofSeconds(1));
metricRegistry.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Trigger.class.getName())
.record(() -> Duration.ofSeconds(1));
metricRegistry.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Schedule.class.getName())
.record(() -> Duration.ofSeconds(1));
CollectorService collectorService = applicationContext.getBean(CollectorService.class);
Usage metrics = collectorService.metrics(true);
assertThat(metrics.getUri()).isEqualTo("https://mysuperhost.com/subpath");
assertThat(metrics.getUuid()).isNotNull();
assertThat(metrics.getVersion()).isNotNull();
assertThat(metrics.getStartTime()).isNotNull();
assertThat(metrics.getEnvironments()).contains("test");
assertThat(metrics.getStartTime()).isNotNull();
assertThat(metrics.getHost().getUuid()).isNotNull();
assertThat(metrics.getHost().getHardware().getLogicalProcessorCount()).isNotNull();
assertThat(metrics.getHost().getJvm().getName()).isNotNull();
assertThat(metrics.getHost().getOs().getFamily()).isNotNull();
assertThat(metrics.getConfigurations().getRepositoryType()).isEqualTo("memory");
assertThat(metrics.getConfigurations().getQueueType()).isEqualTo("memory");
assertThat(metrics.getExecutions()).isNotNull();
// 1 per hour
assertThat(metrics.getExecutions().getDailyExecutionsCount().size()).isGreaterThan(0);
// no task runs as it's an empty instance
assertThat(metrics.getExecutions().getDailyTaskRunsCount()).isNull();
assertThat(metrics.getInstanceUuid()).isEqualTo(TestSettingRepository.instanceUuid);
// we have 3 metrics so we should have the info for the related plugins
assertThat(metrics.getPluginMetrics()).hasSize(3);
}
}
@Singleton
@Requires(property = "kestra.unittest")
@Primary
public static class TestSettingRepository implements SettingRepositoryInterface {
public static Object instanceUuid = null;
@Override
public Optional<Setting> findByKey(String key) {
return Optional.empty();
}
@Override
public List<Setting> findAll() {
return new ArrayList<>();
}
@Override
public Setting save(Setting setting) throws ConstraintViolationException {
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
TestSettingRepository.instanceUuid = setting.getValue();
}
return setting;
}
@Override
public Setting delete(Setting setting) {
return setting;
}
}
}

View File

@@ -0,0 +1,9 @@
package reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.reporter.reports.AbstractFeatureUsageReportTest;
@KestraTest
class H2FeatureUsageReportTest extends AbstractFeatureUsageReportTest {
}

View File

@@ -0,0 +1,9 @@
package reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.reporter.reports.AbstractServiceUsageReportTest;
@KestraTest
class H2ServiceUsageReportTest extends AbstractServiceUsageReportTest {
}

View File

@@ -0,0 +1,9 @@
package reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.reporter.reports.AbstractFeatureUsageReportTest;
@KestraTest
class MysqlFeatureUsageReportTest extends AbstractFeatureUsageReportTest {
}

View File

@@ -0,0 +1,9 @@
package reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.reporter.reports.AbstractServiceUsageReportTest;
@KestraTest
class MysqlServiceUsageReportTest extends AbstractServiceUsageReportTest {
}

View File

@@ -0,0 +1,8 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
@KestraTest
class PostgresFeatureUsageReportTest extends AbstractFeatureUsageReportTest {
}

View File

@@ -0,0 +1,8 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
@KestraTest
class PostgresServiceUsageReportTest extends AbstractServiceUsageReportTest {
}

View File

@@ -3,11 +3,13 @@ package io.kestra.webserver.controllers.api;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.collectors.Usage;
import io.kestra.core.models.collectors.ExecutionUsage;
import io.kestra.core.models.collectors.FlowUsage;
import io.kestra.core.reporter.Reportable;
import io.kestra.core.reporter.reports.FeatureUsageReport;
import io.kestra.core.repositories.DashboardRepositoryInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.core.services.CollectorService;
import io.kestra.core.services.InstanceService;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.VersionProvider;
@@ -24,10 +26,15 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import jakarta.inject.Inject;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Value;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
@@ -37,56 +44,56 @@ import java.util.Optional;
public class MiscController {
@Inject
protected ApplicationContext applicationContext;
@Inject
VersionProvider versionProvider;
@Inject
DashboardRepositoryInterface dashboardRepository;
@Inject
ExecutionRepositoryInterface executionRepository;
@Inject
InstanceService instanceService;
@Inject
CollectorService collectorService;
FeatureUsageReport featureUsageReport;
@Inject
BasicAuthService basicAuthService;
@Inject
Optional<TemplateRepositoryInterface> templateRepository;
@Inject
NamespaceUtils namespaceUtils;
@io.micronaut.context.annotation.Value("${kestra.anonymous-usage-report.enabled}")
protected Boolean isAnonymousUsageEnabled;
@io.micronaut.context.annotation.Value("${kestra.environment.name}")
@Nullable
protected String environmentName;
@io.micronaut.context.annotation.Value("${kestra.environment.color}")
@Nullable
protected String environmentColor;
@io.micronaut.context.annotation.Value("${kestra.url}")
@Nullable
protected String kestraUrl;
@io.micronaut.context.annotation.Value("${kestra.server.preview.initial-rows:100}")
private Integer initialPreviewRows;
@io.micronaut.context.annotation.Value("${kestra.server.preview.max-rows:5000}")
private Integer maxPreviewRows;
@io.micronaut.context.annotation.Value("${kestra.hidden-labels.prefixes:}")
private List<String> hiddenLabelsPrefixes;
@Get("/configs")
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Misc"}, summary = "Retrieve the instance configuration.", description = "Global endpoint available to all users.")
@@ -111,7 +118,7 @@ public class MiscController {
.resourceToFilters(QueryFilter.Resource.asResourceList())
.hiddenLabelsPrefixes(hiddenLabelsPrefixes)
.url(kestraUrl);
if (this.environmentName != null || this.environmentColor != null) {
builder.environment(
Environment.builder()
@@ -120,17 +127,22 @@ public class MiscController {
.build()
);
}
return builder.build();
}
@Get("/{tenant}/usages/all")
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Misc"}, summary = "Retrieve instance usage information")
public Usage getUsages() {
return collectorService.metrics(true);
public ApiUsage getUsages() {
ZonedDateTime now = ZonedDateTime.now();
FeatureUsageReport.UsageEvent event = featureUsageReport.report(now.toInstant(), Reportable.TimeInterval.of(now.minus(Duration.ofDays(1)), now));
return ApiUsage.builder()
.flows(event.getFlows())
.executions(event.getExecutions())
.build();
}
@Post(uri = "/{tenant}/basicAuth")
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Misc"}, summary = "Configure basic authentication for the instance.", description = "Sets up basic authentication credentials.")
@@ -138,73 +150,73 @@ public class MiscController {
@RequestBody(description = "") @Body BasicAuthCredentials basicAuthCredentials
) {
basicAuthService.save(basicAuthCredentials.getUid(), new BasicAuthService.BasicAuthConfiguration(basicAuthCredentials.getUsername(), basicAuthCredentials.getPassword()));
return HttpResponse.noContent();
}
@Get("/basicAuthValidationErrors")
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Misc"}, summary = "Retrieve the instance configuration.", description = "Global endpoint available to all users.")
public List<String> getBasicAuthConfigErrors() {
return basicAuthService.validationErrors();
}
@Getter
@NoArgsConstructor
@SuperBuilder(toBuilder = true)
public static class Configuration {
String uuid;
String version;
String commitId;
ZonedDateTime commitDate;
@JsonInclude
Boolean isCustomDashboardsEnabled;
@JsonInclude
Boolean isTaskRunEnabled;
@JsonInclude
Boolean isAnonymousUsageEnabled;
@JsonInclude
Boolean isTemplateEnabled;
Environment environment;
String url;
Preview preview;
String systemNamespace;
List<String> hiddenLabelsPrefixes;
// List of filter by component
List<QueryFilter.ResourceField> resourceToFilters;
Boolean isAiEnabled;
Boolean isBasicAuthInitialized;
}
@Value
@Builder(toBuilder = true)
public static class Environment {
String name;
String color;
}
@Value
@Builder(toBuilder = true)
public static class Preview {
Integer initial;
Integer max;
}
@Getter
@AllArgsConstructor
public static class BasicAuthCredentials {
@@ -212,4 +224,11 @@ public class MiscController {
private String username;
private String password;
}
@SuperBuilder(toBuilder = true)
@Getter
public static class ApiUsage {
private FlowUsage flows;
private ExecutionUsage executions;
}
}

View File

@@ -1,36 +1,25 @@
package io.kestra.webserver.controllers.api;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.collectors.Usage;
import io.kestra.core.reporter.reports.FeatureUsageReport;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@KestraTest
class MiscUsageControllerTest {
@Inject
@Client("/")
ReactorHttpClient client;
@Test
void usages() {
var response = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/usages/all"), Usage.class);
assertThat(response.getUuid()).isNotNull();
assertThat(response.getVersion()).isNotNull();
assertThat(response.getStartTime()).isNotNull();
assertThat(response.getEnvironments()).contains("test");
assertThat(response.getStartTime()).isNotNull();
assertThat(response.getHost().getUuid()).isNotNull();
assertThat(response.getHost().getHardware().getLogicalProcessorCount()).isNotNull();
assertThat(response.getHost().getJvm().getName()).isNotNull();
assertThat(response.getHost().getOs().getFamily()).isNotNull();
assertThat(response.getConfigurations().getRepositoryType()).isEqualTo("h2");
assertThat(response.getConfigurations().getQueueType()).isEqualTo("h2");
var response = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/usages/all"), FeatureUsageReport.UsageEvent.class);
assertThat(response).isNotNull();
}
}