Feat/clean query filters (#9569)

* feat(repositories): clean and fix query filters

* fix(core): #4106 Clean and add unit tests to query filters

* clean(core): format string more cleanly

* fix(core): unit test failing on mysql flow repository

* feat(core): #4106 update ui filters

* fix(ui): add date filter on flows back

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
This commit is contained in:
Nicolas K.
2025-06-18 17:40:56 +02:00
committed by GitHub
parent d6047fea96
commit 185fa80058
18 changed files with 800 additions and 110 deletions

View File

@@ -0,0 +1,43 @@
package io.kestra.core.exceptions;
import java.io.Serial;
import java.util.List;
/**
* General exception that can be throws when a Kestra entity field is query, but is not valid or existing.
*/
public class InvalidQueryFiltersException extends KestraRuntimeException {
@Serial
private static final long serialVersionUID = 1L;
private static final String INVALID_QUERY_FILTER_MESSAGE = "Provided query filters are invalid";
private transient final List<String> invalids;
/**
* Creates a new {@link InvalidQueryFiltersException} instance.
*
* @param invalids the invalid filters.
*/
public InvalidQueryFiltersException(final List<String> invalids) {
super(INVALID_QUERY_FILTER_MESSAGE);
this.invalids = invalids;
}
/**
* Creates a new {@link InvalidQueryFiltersException} instance.
*
* @param invalid the invalid filter.
*/
public InvalidQueryFiltersException(final String invalid) {
super(INVALID_QUERY_FILTER_MESSAGE);
this.invalids = List.of(invalid);
}
public String formatedInvalidObjects(){
if (invalids == null || invalids.isEmpty()){
return INVALID_QUERY_FILTER_MESSAGE;
}
return String.join(", ", invalids);
}
}

View File

@@ -3,8 +3,10 @@ package io.kestra.core.models;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.dashboards.filters.*;
import io.kestra.core.utils.Enums;
import java.util.ArrayList;
import lombok.Builder;
import java.util.Arrays;
@@ -101,7 +103,7 @@ public record QueryFilter(
NAMESPACE("namespace") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.PREFIX);
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
}
},
LABELS("labels") {
@@ -113,7 +115,7 @@ public record QueryFilter(
FLOW_ID("flowId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.IN, Op.NOT_IN);
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
START_DATE("startDate") {
@@ -125,7 +127,7 @@ public record QueryFilter(
END_DATE("endDate") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
STATE("state") {
@@ -137,8 +139,7 @@ public record QueryFilter(
TIME_RANGE("timeRange") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH,
Op.ENDS_WITH, Op.IN, Op.NOT_IN, Op.REGEX);
return List.of(Op.EQUALS);
}
},
TRIGGER_EXECUTION_ID("triggerExecutionId") {
@@ -217,7 +218,7 @@ public record QueryFilter(
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE, Field.TIME_RANGE,
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE
);
@@ -226,8 +227,8 @@ public record QueryFilter(
LOG {
@Override
public List<Field> supportedField() {
return List.of(Field.NAMESPACE, Field.START_DATE, Field.END_DATE,
Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
);
}
},
@@ -248,7 +249,8 @@ public record QueryFilter(
TRIGGER {
@Override
public List<Field> supportedField() {
return List.of(Field.QUERY, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID,
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
);
}
};
@@ -295,4 +297,26 @@ public record QueryFilter(
public record Operation(String name, String value) {
}
public static void validateQueryFilters(List<QueryFilter> filters, Resource resource){
if (filters == null) {
return;
}
List<String> errors = new ArrayList<>();
filters.forEach(filter -> {
if (!filter.field().supportedOp().contains(filter.operation())) {
errors.add("Operation %s is not supported for field %s. Supported operations are %s".formatted(
filter.operation(), filter.field().name(),
filter.field().supportedOp().stream().map(Op::name).collect(Collectors.joining(", "))));
}
if (!resource.supportedField().contains(filter.field())){
errors.add("Field %s is not supported for resource %s. Supported fields are %s".formatted(
filter.field().name(), resource.name(),
resource.supportedField().stream().map(Field::name).collect(Collectors.joining(", "))));
}
});
if (!errors.isEmpty()){
throw new InvalidQueryFiltersException(errors);
}
}
}

View File

@@ -0,0 +1,252 @@
package io.kestra.core.models;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.QueryFilter.Resource;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
public class QueryFilterTest {
@ParameterizedTest
@MethodSource("validOperationFilters")
void should_validate_all_operations(QueryFilter filter, Resource resource){
assertDoesNotThrow(() -> QueryFilter.validateQueryFilters(List.of(filter), resource));
}
@ParameterizedTest
@MethodSource("invalidOperationFilters")
void should_fail_to_validate_all_operations(QueryFilter filter, Resource resource){
InvalidQueryFiltersException e = assertThrows(
InvalidQueryFiltersException.class,
() -> QueryFilter.validateQueryFilters(List.of(filter), resource));
assertThat(e.formatedInvalidObjects()).contains("Operation");
}
static Stream<Arguments> validOperationFilters() {
return Stream.of(
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.EQUALS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.NOT_EQUALS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.EQUALS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.NOT_EQUALS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.IN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.NOT_IN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.STARTS_WITH).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.ENDS_WITH).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.CONTAINS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.REGEX).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.PREFIX).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.EQUALS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.NOT_EQUALS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.STARTS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.ENDS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.CONTAINS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.REGEX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.STARTS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.ENDS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.CONTAINS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.EQUALS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.NOT_EQUALS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.IN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.NOT_IN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.STARTS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.CONTAINS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.EQUALS).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.NOT_EQUALS).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.IN).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.NOT_IN).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.STARTS_WITH).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.ENDS_WITH).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.CONTAINS).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.EQUALS).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.NOT_EQUALS).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.EQUALS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.NOT_EQUALS).build(), Resource.LOG)
);
}
static Stream<Arguments> invalidOperationFilters() {
return Stream.of(
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.GREATER_THAN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.LESS_THAN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.IN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.NOT_IN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.STARTS_WITH).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.ENDS_WITH).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.CONTAINS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.REGEX).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.QUERY).operation(Op.PREFIX).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.STARTS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.ENDS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.CONTAINS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.REGEX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.GREATER_THAN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.LESS_THAN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.GREATER_THAN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.LESS_THAN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.IN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.NOT_IN).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.STARTS_WITH).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.ENDS_WITH).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.CONTAINS).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.REGEX).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.LABELS).operation(Op.PREFIX).build(), Resource.FLOW),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.STARTS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.ENDS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.CONTAINS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.REGEX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.STARTS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.ENDS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.CONTAINS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.REGEX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.STARTS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.ENDS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.CONTAINS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.REGEX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.STATE).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.REGEX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.GREATER_THAN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.LESS_THAN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.REGEX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.PREFIX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.STARTS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.ENDS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.CONTAINS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.REGEX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.GREATER_THAN).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.LESS_THAN).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.REGEX).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.WORKER_ID).operation(Op.PREFIX).build(), Resource.TRIGGER),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.GREATER_THAN).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.LESS_THAN).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.IN).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.NOT_IN).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.STARTS_WITH).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.ENDS_WITH).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.CONTAINS).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.REGEX).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.EXISTING_ONLY).operation(Op.PREFIX).build(), Resource.NAMESPACE),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.GREATER_THAN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.LESS_THAN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.IN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.NOT_IN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.STARTS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.ENDS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.CONTAINS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.REGEX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.MIN_LEVEL).operation(Op.PREFIX).build(), Resource.LOG)
);
}
}

View File

@@ -2,9 +2,12 @@ package io.kestra.core.repositories;
import com.devskiller.friendly_id.FriendlyId;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.dashboards.AggregationType;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.executions.Execution;
@@ -16,8 +19,10 @@ import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.FlowScope;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.plugin.core.dashboard.data.Executions;
@@ -25,6 +30,7 @@ import io.kestra.plugin.core.debug.Return;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -35,10 +41,16 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.groups.Tuple.tuple;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -152,6 +164,47 @@ public abstract class AbstractExecutionRepositoryTest {
.build());
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter, int expectedSize){
inject("executionTriggerId");
ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
assertThat(entries).hasSize(expectedSize);
}
static Stream<Arguments> filterCombinations() {
return Stream.of(
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unittest").operation(Op.EQUALS).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build(), 1),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.EQUALS).build(), 15),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.STATE).value(Type.RUNNING).operation(Op.EQUALS).build(), 5),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 28)
);
}
@ParameterizedTest
@MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
}
static Stream<QueryFilter> errorFilterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()
);
}
@Test
protected void find() {
inject();

View File

@@ -3,13 +3,18 @@ package io.kestra.core.repositories;
import io.kestra.core.Helpers;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.schedulers.AbstractSchedulerTest;
import io.kestra.core.services.FlowService;
import io.kestra.plugin.core.debug.Return;
@@ -22,6 +27,8 @@ import io.micronaut.data.model.Sort;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.ZonedDateTime;
import java.util.stream.Stream;
import lombok.Getter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -34,8 +41,13 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeoutException;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -72,6 +84,89 @@ public abstract class AbstractFlowRepositoryTest {
.tasks(Collections.singletonList(Return.builder().id(taskId).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()));
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
FlowWithSource flow = FlowWithSource.builder()
.id("filterFlowId")
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
.tenantId(MAIN_TENANT)
.labels(Label.from(Map.of("key", "value")))
.build();
flow = flowRepository.create(GenericFlow.of(flow));
try {
ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
assertThat(entries).hasSize(1);
} finally {
deleteFlow(flow);
}
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all_with_source(QueryFilter filter){
FlowWithSource flow = FlowWithSource.builder()
.id("filterFlowId")
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
.tenantId(MAIN_TENANT)
.labels(Label.from(Map.of("key", "value")))
.build();
flow = flowRepository.create(GenericFlow.of(flow));
try {
ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
assertThat(entries).hasSize(1);
} finally {
deleteFlow(flow);
}
}
static Stream<QueryFilter> filterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.QUERY).value("filterFlowId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.SCOPE).value(List.of(SYSTEM)).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.NAMESPACE).value(SYSTEM_FLOWS_DEFAULT_NAMESPACE).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build()
);
}
@ParameterizedTest
@MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){
assertThrows(
InvalidQueryFiltersException.class,
() -> flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
}
@ParameterizedTest
@MethodSource("errorFilterCombinations")
void should_fail_to_find_all_with_source(QueryFilter filter){
assertThrows(
InvalidQueryFiltersException.class,
() -> flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
}
static Stream<QueryFilter> errorFilterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.FLOW_ID).value("sleep").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(),
QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(),
QueryFilter.builder().field(Field.STATE).value(State.Type.RUNNING).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()
);
}
@Test
void findById() {
FlowWithSource flow = builder()

View File

@@ -1,19 +1,26 @@
package io.kestra.core.repositories;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.dashboards.AggregationType;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.statistics.LogStatistics;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.core.dashboard.data.Logs;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
@@ -24,8 +31,11 @@ import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest
public abstract class AbstractLogRepositoryTest {
@@ -44,9 +54,84 @@ public abstract class AbstractLogRepositoryTest {
.level(level)
.thread("")
.tenantId(MAIN_TENANT)
.triggerId("triggerId")
.message("john doe");
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
logRepository.save(logEntry(Level.INFO).build());
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
assertThat(entries).hasSize(1);
}
static Stream<QueryFilter> filterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.QUERY).value("flowId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.QUERY).value("anotherId").operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.SCOPE).value(List.of(SYSTEM)).operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.NAMESPACE).value("another.namespace").operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.NAMESPACE).value("kestra").operation(Op.CONTAINS).build(),
QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra").operation(Op.STARTS_WITH).build(),
QueryFilter.builder().field(Field.NAMESPACE).value("unittest").operation(Op.ENDS_WITH).build(),
QueryFilter.builder().field(Field.NAMESPACE).value(".*kestra.*").operation(Op.REGEX).build(),
QueryFilter.builder().field(Field.NAMESPACE).value(List.of("io.kestra.unittest")).operation(Op.IN).build(),
QueryFilter.builder().field(Field.NAMESPACE).value(List.of("another.namespace")).operation(Op.NOT_IN).build(),
QueryFilter.builder().field(Field.NAMESPACE).value("io").operation(Op.PREFIX).build(),
QueryFilter.builder().field(Field.FLOW_ID).value("flowId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.FLOW_ID).value("anotherFlowId").operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.FLOW_ID).value("lowI").operation(Op.CONTAINS).build(),
QueryFilter.builder().field(Field.FLOW_ID).value("flow").operation(Op.STARTS_WITH).build(),
QueryFilter.builder().field(Field.FLOW_ID).value("Id").operation(Op.ENDS_WITH).build(),
QueryFilter.builder().field(Field.FLOW_ID).value(".lowI.").operation(Op.REGEX).build(),
QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(),
QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(),
QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN_OR_EQUAL_TO).build(),
QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(),
QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(),
QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(),
QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN_OR_EQUAL_TO).build(),
QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(),
QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("triggerId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("anotherId").operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("igger").operation(Op.CONTAINS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("trigger").operation(Op.STARTS_WITH).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("Id").operation(Op.ENDS_WITH).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("triggerId")).operation(Op.IN).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.ERROR).operation(Op.NOT_EQUALS).build()
);
}
@ParameterizedTest
@MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){
assertThrows(
InvalidQueryFiltersException.class,
() -> logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
}
static Stream<QueryFilter> errorFilterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.STATE).value(State.Type.RUNNING).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build()
);
}
@Test
void all() {
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);

View File

@@ -1,18 +1,32 @@
package io.kestra.core.repositories;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest
public abstract class AbstractTriggerRepositoryTest {
@@ -30,6 +44,69 @@ public abstract class AbstractTriggerRepositoryTest {
.date(ZonedDateTime.now());
}
protected static Trigger generateDefaultTrigger(){
Trigger trigger = Trigger.builder()
.tenantId(MAIN_TENANT)
.triggerId("triggerId")
.namespace("trigger.namespace")
.flowId("flowId")
.nextExecutionDate(ZonedDateTime.now())
.build();
trigger.setWorkerId("workerId");
return trigger;
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
triggerRepository.save(generateDefaultTrigger());
ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
assertThat(entries).hasSize(1);
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all_async(QueryFilter filter){
triggerRepository.save(generateDefaultTrigger());
List<Trigger> entries = triggerRepository.find(MAIN_TENANT, List.of(filter)).collectList().block();
assertThat(entries).hasSize(1);
}
static Stream<QueryFilter> filterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.QUERY).value("flowId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.NAMESPACE).value("trigger.namespace").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.FLOW_ID).value("flowId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(),
QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("triggerId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.WORKER_ID).value("workerId").operation(Op.EQUALS).build()
);
}
@ParameterizedTest
@MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
}
static Stream<QueryFilter> errorFilterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.STATE).value(State.Type.RUNNING).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()
);
}
@Test
void all() {
Trigger.TriggerBuilder<?, ?> builder = trigger();

View File

@@ -2,15 +2,9 @@ package io.kestra.repository.h2;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.Collections;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -20,8 +14,6 @@ import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
public class H2FlowRepositoryTest extends AbstractJdbcFlowRepositoryTest {

View File

@@ -4,6 +4,7 @@ import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Resource;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -199,8 +200,17 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
abstract protected Condition findCondition(String query, Map<String, String> labels);
protected Condition findQueryCondition(String query) {
return findCondition(query, Map.of());
}
abstract protected Condition findCondition(Map<?, ?> value, QueryFilter.Op operation);
@Override
protected Condition findLabelCondition(Map<?, ?> value, QueryFilter.Op operation) {
return findCondition(value, operation);
}
protected Condition statesFilter(List<State.Type> state) {
return field("state_current")
.in(state.stream().map(Enum::name).toList());
@@ -293,22 +303,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
.where(this.defaultFilter(tenantId, false))
.and(NORMAL_KIND_CONDITION);
if (filters != null)
for (QueryFilter filter : filters) {
QueryFilter.Field field = filter.field();
QueryFilter.Op operation = filter.operation();
Object value = filter.value();
if (field.equals(QueryFilter.Field.QUERY)) {
select = switch (operation) {
case EQUALS -> select.and(this.findCondition(filter.value().toString(), Map.of()));
case NOT_EQUALS -> select.andNot(this.findCondition(filter.value().toString(), Map.of()));
default -> throw new UnsupportedOperationException("Unsupported operation for QUERY field: " + operation);
};
} else if (field.equals(QueryFilter.Field.LABELS) && value instanceof Map<?, ?> labels)
select = select.and(findCondition(labels, operation));
else
select = getConditionOnField(select, field, value, operation, "start_date");
}
select = this.filter(select, filters, "start_date", Resource.EXECUTION);
return select;
}

View File

@@ -10,6 +10,7 @@ import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Resource;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowForExecution;
@@ -53,9 +54,7 @@ import org.jooq.impl.DSL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -562,8 +561,18 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
}
abstract protected Condition findCondition(String query, Map<String, String> labels);
protected Condition findQueryCondition(String query) {
return findCondition(query, Map.of());
}
abstract protected Condition findCondition(Object value, QueryFilter.Op operation);
@Override
protected Condition findLabelCondition(Map<?, ?> value, QueryFilter.Op operation) {
return findCondition(value, operation);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public ArrayListTotal<Flow> find(Pageable pageable, @Nullable String tenantId, @Nullable List<QueryFilter> filters) {
@@ -602,22 +611,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
@SuppressWarnings("unchecked")
private <R extends Record, E> SelectConditionStep<R> getFindFlowSelect(String tenantId, List<QueryFilter> filters, DSLContext context, List<Field<Object>> additionalFieldsToSelect) {
var select = this.fullTextSelect(tenantId, context, additionalFieldsToSelect != null ? additionalFieldsToSelect : List.of());
if (filters != null)
for (QueryFilter filter : filters) {
QueryFilter.Field field = filter.field();
QueryFilter.Op operation = filter.operation();
Object value = filter.value();
if (field.equals(QueryFilter.Field.QUERY)) {
select = switch (operation) {
case EQUALS -> select.and(this.findCondition(filter.value().toString(), Map.of()));
case NOT_EQUALS -> select.andNot(this.findCondition(filter.value().toString(), Map.of()));
default -> throw new UnsupportedOperationException("Unsupported operation for QUERY field: " + operation);
};
} else if (field.equals(QueryFilter.Field.LABELS) && value instanceof Map<?, ?> labels)
select = select.and(findCondition(labels, operation));
else
select = getConditionOnField(select, field, value, operation, null);
}
select = this.filter(select, filters, null, Resource.FLOW);
return (SelectConditionStep<R>) select;
}

View File

@@ -1,6 +1,7 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Resource;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -48,6 +49,10 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
abstract protected Condition findCondition(String query);
protected Condition findQueryCondition(String query) {
return findCondition(query);
}
@Getter
private final JdbcFilterService filterService;
@@ -86,8 +91,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
@Nullable String tenantId,
@Nullable List<QueryFilter> filters
) {
String query = getQuery(filters);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
@@ -98,10 +101,9 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
.hint(context.configuration().dialect().supports(SQLDialect.MYSQL) ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION)
.and(this.findCondition(query));
.and(NORMAL_KIND_CONDITION);
select = this.filter(select, filters, "timestamp");
select = this.filter(select, filters, "timestamp", Resource.LOG);
return this.jdbcRepository.fetchPage(context, select, pageable);
});

View File

@@ -1,6 +1,8 @@
package io.kestra.jdbc.repository;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Resource;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.Order;
@@ -241,9 +243,11 @@ public abstract class AbstractJdbcRepository {
protected <T extends Record> SelectConditionStep<T> filter(
SelectConditionStep<T> select,
List<QueryFilter> filters,
String dateColumn
String dateColumn,
Resource resource
) {
if (filters != null) {
QueryFilter.validateQueryFilters(filters, resource);
for (QueryFilter filter : filters) {
QueryFilter.Field field = filter.field();
QueryFilter.Op operation = filter.operation();
@@ -266,7 +270,7 @@ public abstract class AbstractJdbcRepository {
@Nullable String dateColumn
) {
if (field.equals(QueryFilter.Field.QUERY)) {
return select;
return handleQuery(select, value, operation);
}
// Handling for Field.STATE
if (field.equals(QueryFilter.Field.STATE)) {
@@ -285,7 +289,7 @@ public abstract class AbstractJdbcRepository {
// Special handling for START_DATE and END_DATE
if (field == QueryFilter.Field.START_DATE || field == QueryFilter.Field.END_DATE) {
if(dateColumn == null){
throw new IllegalArgumentException("When creating filtering on START_DATE and/or END_DATE, dateColumn is required but was null");
throw new InvalidQueryFiltersException("When creating filtering on START_DATE and/or END_DATE, dateColumn is required but was null");
}
OffsetDateTime dateTime = (value instanceof ZonedDateTime)
? ((ZonedDateTime) value).toOffsetDateTime()
@@ -297,6 +301,14 @@ public abstract class AbstractJdbcRepository {
return applyScopeCondition(select, value, operation);
}
if (field.equals(QueryFilter.Field.LABELS)) {
if (value instanceof Map<?, ?> map){
return select.and(findLabelCondition(map, operation));
} else {
throw new InvalidQueryFiltersException("Label field value must but instance of Map");
}
}
// Convert the field name to lowercase and quote it
Name columnName = DSL.quotedName(field.name().toLowerCase());
@@ -310,14 +322,14 @@ public abstract class AbstractJdbcRepository {
if (value instanceof Collection<?>) {
select = select.and(DSL.field(columnName).in((Collection<?>) value));
} else {
throw new IllegalArgumentException("IN operation requires a collection as value");
throw new InvalidQueryFiltersException("IN operation requires a collection as value");
}
}
case NOT_IN -> {
if (value instanceof Collection<?>) {
select = select.and(DSL.field(columnName).notIn((Collection<?>) value));
} else {
throw new IllegalArgumentException("NOT_IN operation requires a collection as value");
throw new InvalidQueryFiltersException("NOT_IN operation requires a collection as value");
}
}
case STARTS_WITH -> select = select.and(DSL.field(columnName).like(value + "%"));
@@ -329,11 +341,19 @@ public abstract class AbstractJdbcRepository {
DSL.field(columnName).like(value + ".%")
.or(DSL.field(columnName).eq(value))
);
default -> throw new UnsupportedOperationException("Unsupported operation: " + operation);
default -> throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
}
return select;
}
protected Condition findQueryCondition(String query) {
throw new InvalidQueryFiltersException("Unsupported operation: ");
}
protected Condition findLabelCondition(Map<?, ?> value, QueryFilter.Op operation) {
throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
}
// Generate the condition for Field.STATE
@SuppressWarnings("unchecked")
private Condition generateStateCondition(Object value, QueryFilter.Op operation) {
@@ -343,13 +363,13 @@ public abstract class AbstractJdbcRepository {
case State.Type state -> List.of(state);
case String state -> List.of(State.Type.valueOf(state));
default ->
throw new IllegalArgumentException("Field 'state' requires a State.Type or List<State.Type> value");
throw new InvalidQueryFiltersException("Field 'state' requires a State.Type or List<State.Type> value");
};
return switch (operation) {
case IN, EQUALS -> statesFilter(stateList);
case NOT_IN, NOT_EQUALS -> DSL.not(statesFilter(stateList));
default -> throw new IllegalArgumentException("Unsupported operation for State.Type: " + operation);
default -> throw new InvalidQueryFiltersException("Unsupported operation for State.Type: " + operation);
};
}
@@ -358,6 +378,18 @@ public abstract class AbstractJdbcRepository {
.in(state.stream().map(Enum::name).toList());
}
private <T extends Record> SelectConditionStep<T> handleQuery(SelectConditionStep<T> select,
Object value,
QueryFilter.Op operation) {
Condition condition = findQueryCondition(value.toString());
return switch (operation) {
case EQUALS -> select.and(condition);
case NOT_EQUALS -> select.andNot(condition);
default -> throw new InvalidQueryFiltersException("Unsupported operation for QUERY field: " + operation);
};
}
// Handle CHILD_FILTER field logic
private <T extends Record> SelectConditionStep<T> handleChildFilter(SelectConditionStep<T> select, Object value) {
ChildFilter childFilter = (value instanceof String val) ? ChildFilter.valueOf(val) : (ChildFilter) value;
@@ -378,7 +410,7 @@ public abstract class AbstractJdbcRepository {
switch (operation) {
case EQUALS -> select = select.and(minLevelCondition(minLevel));
case NOT_EQUALS -> select = select.and(minLevelCondition(minLevel).not());
default -> throw new UnsupportedOperationException(
default -> throw new InvalidQueryFiltersException(
"Unsupported operation for MIN_LEVEL: " + operation
);
}
@@ -404,33 +436,24 @@ public abstract class AbstractJdbcRepository {
case EQUALS -> select = select.and(field(fieldName).eq(dateTime));
case NOT_EQUALS -> select = select.and(field(fieldName).ne(dateTime));
default ->
throw new UnsupportedOperationException("Unsupported operation for date condition: " + operation);
throw new InvalidQueryFiltersException("Unsupported operation for date condition: " + operation);
}
return select;
}
protected static String getQuery(List<QueryFilter> filters) {
if (filters == null || filters.isEmpty()) return null;
return filters.stream()
.filter(filter -> filter.field() == QueryFilter.Field.QUERY)
.map(filter -> filter.value().toString())
.findFirst()
.orElse(null);
}
private <T extends Record> SelectConditionStep<T> applyScopeCondition(
SelectConditionStep<T> select, Object value, QueryFilter.Op operation) {
if (!(value instanceof List<?> scopeValues)) {
throw new IllegalArgumentException("Invalid value for SCOPE filtering");
throw new InvalidQueryFiltersException("Invalid value for SCOPE filtering");
}
List<FlowScope> validScopes = Arrays.stream(FlowScope.values()).toList();
if (!validScopes.containsAll(scopeValues)) {
throw new IllegalArgumentException("Scope values must be a subset of FlowScope");
throw new InvalidQueryFiltersException("Scope values must be a subset of FlowScope");
}
if (operation != QueryFilter.Op.EQUALS && operation != QueryFilter.Op.NOT_EQUALS) {
throw new UnsupportedOperationException("Unsupported operation for SCOPE: " + operation);
throw new InvalidQueryFiltersException("Unsupported operation for SCOPE: " + operation);
}
boolean isEqualsOperation = (operation == QueryFilter.Op.EQUALS);

View File

@@ -1,6 +1,7 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Resource;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.dashboards.DataFilter;
@@ -305,28 +306,26 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
});
}
@Override
public ArrayListTotal<Trigger> find(Pageable pageable,String tenantId, List<QueryFilter> filters) {
public ArrayListTotal<Trigger> find(Pageable pageable, String tenantId, List<QueryFilter> filters) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
// extract Query field from the filters list
String query = getQuery(filters);
// Base query with table and DSL fields
SelectConditionStep<?> select = context
.select(field("value"))
.hint(context.configuration().dialect().supports(SQLDialect.MYSQL) ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(this.fullTextCondition(query));
select = filter(select, filters, "next_execution_date");
// Return paginated results
SelectConditionStep<?> select = generateSelect(context, tenantId, filters);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
private SelectConditionStep<?> generateSelect(DSLContext context, String tenantId, List<QueryFilter> filters){
SelectConditionStep<?> select = context
.select(field("value"))
.hint(context.configuration().dialect().supports(SQLDialect.MYSQL) ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return filter(select, filters, "next_execution_date", Resource.TRIGGER);
}
@Override
public ArrayListTotal<Trigger> find(Pageable pageable, String query, String tenantId, String namespace, String flowId, String workerId) {
return this.jdbcRepository
@@ -366,18 +365,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
// extract Query field from the filters list
String query = getQuery(filters);
// Base query with table and DSL fields
SelectConditionStep<?> select = context
.select(field("value"))
.hint(context.configuration().dialect().supports(SQLDialect.MYSQL) ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(this.fullTextCondition(query));
select = filter(select, filters, "next_execution_date");
SelectConditionStep<?> select = generateSelect(context, tenantId, filters);
select.fetch()
.map(this.jdbcRepository::map)
@@ -395,6 +383,10 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
return query == null ? DSL.trueCondition() : jdbcRepository.fullTextCondition(List.of("fulltext"), query);
}
protected Condition findQueryCondition(String query) {
return fullTextCondition(query);
}
protected Condition defaultFilter(String tenantId, boolean allowDeleted) {
return buildTenantCondition(tenantId);
}

View File

@@ -1,12 +1,8 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public abstract class AbstractJdbcTriggerRepositoryTest extends io.kestra.core.repositories.AbstractTriggerRepositoryTest {
@Inject

View File

@@ -45,6 +45,22 @@ const logFilterKeys: Record<string, FilterKeyCompletions> = {
false,
["timeRange"]
),
scope: new FilterKeyCompletions(
[Comparators.EQUALS, Comparators.NOT_EQUALS],
async (_, hardcodedValues) => hardcodedValues.SCOPES,
undefined,
["scope"]
),
triggerId: new FilterKeyCompletions(
[Comparators.EQUALS, Comparators.NOT_EQUALS, Comparators.CONTAINS, Comparators.STARTS_WITH, Comparators.ENDS_WITH],
undefined,
true
),
flowId: new FilterKeyCompletions(
[Comparators.EQUALS, Comparators.NOT_EQUALS, Comparators.CONTAINS, Comparators.STARTS_WITH, Comparators.ENDS_WITH, Comparators.REGEX],
undefined,
true
),
};
class LogFilterLanguage extends FilterLanguage {

View File

@@ -1,4 +1,9 @@
import {Comparators, Completion, FilterKeyCompletions} from "../filterCompletion.ts";
import {
Comparators,
Completion,
FilterKeyCompletions,
PICK_DATE_VALUE
} from "../filterCompletion.ts";
import {FilterLanguage} from "../filterLanguage.ts";
import permission from "../../../../../models/permission.ts";
import action from "../../../../../models/action.ts";
@@ -28,6 +33,40 @@ const triggerFilterKeys: Record<string, FilterKeyCompletions> = {
undefined,
true
),
timeRange: new FilterKeyCompletions(
[Comparators.EQUALS],
async (_, hardcodedValues) => hardcodedValues.RELATIVE_DATE,
false,
["timeRange", "startDate", "endDate"]
),
startDate: new FilterKeyCompletions(
[Comparators.GREATER_THAN_OR_EQUAL_TO, Comparators.GREATER_THAN, Comparators.LESS_THAN_OR_EQUAL_TO, Comparators.LESS_THAN, Comparators.EQUALS, Comparators.NOT_EQUALS],
async () => PICK_DATE_VALUE,
false,
["timeRange"]
),
endDate: new FilterKeyCompletions(
[Comparators.LESS_THAN_OR_EQUAL_TO, Comparators.LESS_THAN, Comparators.GREATER_THAN_OR_EQUAL_TO, Comparators.GREATER_THAN, Comparators.EQUALS, Comparators.NOT_EQUALS],
async () => PICK_DATE_VALUE,
false,
["timeRange"]
),
scope: new FilterKeyCompletions(
[Comparators.EQUALS, Comparators.NOT_EQUALS],
async (_, hardcodedValues) => hardcodedValues.SCOPES,
undefined,
["scope"]
),
triggerId: new FilterKeyCompletions(
[Comparators.EQUALS, Comparators.NOT_EQUALS, Comparators.CONTAINS, Comparators.STARTS_WITH, Comparators.ENDS_WITH],
undefined,
true
),
workerId: new FilterKeyCompletions(
[Comparators.EQUALS, Comparators.NOT_EQUALS, Comparators.CONTAINS, Comparators.STARTS_WITH, Comparators.ENDS_WITH],
undefined,
true
)
}
class TriggerFilterLanguage extends FilterLanguage {

View File

@@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InvalidException;
import io.kestra.core.exceptions.NotFoundException;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.micronaut.core.convert.exceptions.ConversionErrorException;
import io.micronaut.http.HttpRequest;
@@ -148,6 +149,11 @@ public class ErrorController {
return jsonError(request, e, HttpStatus.NOT_FOUND, Optional.ofNullable(e.getMessage()).orElse("Not Found"));
}
@Error(global = true)
public HttpResponse<JsonError> error(HttpRequest<?> request, InvalidQueryFiltersException e) {
return jsonError(request, e, HttpStatus.BAD_REQUEST, e.formatedInvalidObjects());
}
@Error(global = true)
public HttpResponse<JsonError> error(HttpRequest<?> request, HttpStatusException e) {
return jsonError(request, e, e.getStatus(), e.getStatus().getReason());

View File

@@ -327,8 +327,14 @@ class ExecutionControllerTest {
}
@Test
void badDate() {
void badQueryFilters() {
HttpClientResponseException exception = assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().retrieve(GET(
"/api/v1/main/executions/search?filters[triggerId][EQUALS]=test"), PagedResults.class));
assertThat(exception.getStatus().getCode()).isEqualTo(HttpStatus.BAD_REQUEST.getCode());
assertThat(exception.getMessage()).isEqualTo("Field TRIGGER_ID is not supported for resource EXECUTION. Supported fields are QUERY, SCOPE, FLOW_ID, START_DATE, END_DATE, STATE, LABELS, TRIGGER_EXECUTION_ID, CHILD_FILTER, NAMESPACE: Provided query filters are invalid");
exception = assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().retrieve(GET(
"/api/v1/main/executions/search?filters[startDate][EQUALS]=2024-06-03T00:00:00.000%2B02:00&filters[endDate][EQUALS]=2023-06-05T00:00:00.000%2B02:00"), PagedResults.class));
assertThat(exception.getStatus().getCode()).isEqualTo(422);