mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(executions): add IN, NOT_IN, CONTAINS LABELS #11916
- advance on https://github.com/kestra-io/kestra/issues/11587 - companion PR: https://github.com/kestra-io/kestra-ee/pull/5617
This commit is contained in:
@@ -3,6 +3,7 @@ package io.kestra.repository.mysql;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -15,8 +16,9 @@ import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
import static io.kestra.core.models.QueryFilter.Op.EQUALS;
|
||||
|
||||
@Singleton
|
||||
@MysqlRepositoryEnabled
|
||||
@@ -35,8 +37,8 @@ public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Condition findCondition(Map<?, ?> value, QueryFilter.Op operation) {
|
||||
return MysqlExecutionRepositoryService.findCondition(value, operation);
|
||||
protected Condition findLabelCondition(Either<Map<?, ?>, String> input, QueryFilter.Op operation) {
|
||||
return MysqlExecutionRepositoryService.findLabelCondition(input, operation);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,48 +2,67 @@ package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.AbstractJdbcRepository;
|
||||
import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
import static io.kestra.core.models.QueryFilter.Op.EQUALS;
|
||||
|
||||
public abstract class MysqlExecutionRepositoryService {
|
||||
public static Condition findCondition(AbstractJdbcRepository<Execution> jdbcRepository, String query, Map<String, String> labels) {
|
||||
List<Condition> conditions = new ArrayList<>();
|
||||
List<Condition> conditions = new ArrayList<>();
|
||||
|
||||
if (query != null) {
|
||||
conditions.add(jdbcRepository.fullTextCondition(Arrays.asList("namespace", "flow_id", "id"), query));
|
||||
}
|
||||
if (query != null) {
|
||||
conditions.add(jdbcRepository.fullTextCondition(Arrays.asList("namespace", "flow_id", "id"), query));
|
||||
}
|
||||
|
||||
if (labels != null) {
|
||||
labels.forEach((key, value) -> {
|
||||
Field<Boolean> valueField = DSL.field("JSON_CONTAINS(value, JSON_ARRAY(JSON_OBJECT('key', '" + key + "', 'value', '" + value + "')), '$.labels')", Boolean.class);
|
||||
conditions.add(valueField.eq(value != null));
|
||||
});
|
||||
}
|
||||
if (labels != null) {
|
||||
labels.forEach((key, value) -> {
|
||||
Field<Boolean> valueField = DSL.field("JSON_CONTAINS(value, JSON_ARRAY(JSON_OBJECT('key', '" + key + "', 'value', '" + value + "')), '$.labels')", Boolean.class);
|
||||
conditions.add(valueField.eq(value != null));
|
||||
});
|
||||
}
|
||||
|
||||
return conditions.isEmpty() ? DSL.trueCondition() : DSL.and(conditions);
|
||||
return conditions.isEmpty() ? DSL.trueCondition() : DSL.and(conditions);
|
||||
}
|
||||
|
||||
public static Condition findCondition(Map<?,?> labels, QueryFilter.Op operation) {
|
||||
public static Condition findLabelCondition(Either<Map<?, ?>, String> input, QueryFilter.Op operation) {
|
||||
List<Condition> conditions = new ArrayList<>();
|
||||
|
||||
List<Condition> inConditions = new ArrayList<>();
|
||||
if (input.isRight()) {
|
||||
var query = input.getRight();
|
||||
if (Objects.requireNonNull(operation) == QueryFilter.Op.CONTAINS) {
|
||||
conditions.add
|
||||
(DSL.condition(
|
||||
"JSON_SEARCH(value, 'one', CONCAT('%', ?, '%'), NULL, '$.labels[*].key') IS NOT NULL", query)
|
||||
.or(DSL.condition(
|
||||
"JSON_SEARCH(value, 'one', CONCAT('%', ?, '%'), NULL, '$.labels[*].value') IS NOT NULL", query)
|
||||
));
|
||||
} else {
|
||||
throw new UnsupportedOperationException("Unsupported operation for query: " + operation);
|
||||
}
|
||||
} else {
|
||||
var labels = input.getLeft();
|
||||
labels.forEach((key, value) -> {
|
||||
String sql = "JSON_CONTAINS(value, JSON_ARRAY(JSON_OBJECT('key', '" + key + "', 'value', '" + value + "')), '$.labels')";
|
||||
if (operation.equals(EQUALS))
|
||||
conditions.add(DSL.condition(sql));
|
||||
else
|
||||
conditions.add(DSL.not(DSL.condition(sql)));
|
||||
|
||||
switch(operation){
|
||||
case EQUALS ->
|
||||
conditions.add(DSL.condition(sql));
|
||||
case NOT_EQUALS, NOT_IN ->
|
||||
conditions.add(DSL.not(DSL.condition(sql)));
|
||||
case IN ->
|
||||
inConditions.add(DSL.condition(sql));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if(!inConditions.isEmpty()){
|
||||
conditions.add(DSL.or(inConditions));
|
||||
}
|
||||
return conditions.isEmpty() ? DSL.trueCondition() : DSL.and(conditions);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user