feat(core): multi-tenancy in the multipleconditions storage (#2427)

This commit is contained in:
Loïc Mathieu
2023-11-02 22:06:33 +01:00
committed by GitHub
parent 2c5f4ba95b
commit 119db01b56
9 changed files with 35 additions and 10 deletions

View File

@@ -13,7 +13,7 @@ import java.util.Optional;
public interface MultipleConditionStorageInterface {
Optional<MultipleConditionWindow> get(Flow flow, String conditionId);
List<MultipleConditionWindow> expired();
List<MultipleConditionWindow> expired(String tenantId);
default MultipleConditionWindow getOrCreate(Flow flow, MultipleCondition multipleCondition) {
ZonedDateTime now = ZonedDateTime.now()
@@ -41,6 +41,7 @@ public interface MultipleConditionStorageInterface {
.orElseGet(() -> MultipleConditionWindow.builder()
.namespace(flow.getNamespace())
.flowId(flow.getId())
.tenantId(flow.getTenantId())
.conditionId(multipleCondition.getId())
.start(start)
.end(end)

View File

@@ -2,6 +2,7 @@ package io.kestra.core.models.triggers.multipleflows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.IdUtils;
import lombok.Builder;
import lombok.Value;
@@ -13,6 +14,8 @@ import java.util.Map;
@Value
@Builder
public class MultipleConditionWindow {
String tenantId;
String namespace;
String flowId;
@@ -27,19 +30,21 @@ public class MultipleConditionWindow {
@JsonIgnore
public String uid() {
return String.join("_", Arrays.asList(
return IdUtils.fromParts(
this.tenantId,
this.namespace,
this.flowId,
this.conditionId
));
);
}
public static String uid(Flow flow, String conditionId) {
return String.join("_", Arrays.asList(
return IdUtils.fromParts(
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
conditionId
));
);
}
public boolean isValid(ZonedDateTime now) {
@@ -60,6 +65,7 @@ public class MultipleConditionWindow {
.forEach(e -> finalResults.put(e.getKey(), true));
return new MultipleConditionWindow(
this.tenantId,
this.namespace,
this.flowId,
this.conditionId,

View File

@@ -130,7 +130,7 @@ public abstract class AbstractFlowTriggerService {
.map(Map::size)
.orElse(0))
.map(Map.Entry::getValue),
multipleConditionStorage.get().expired().stream()
multipleConditionStorage.get().expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage.get()::delete);
}

View File

@@ -130,12 +130,12 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a"), is(true));
List<MultipleConditionWindow> expired = multipleConditionStorage.expired();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
assertThat(expired.size(), is(0));
Thread.sleep(2005);
expired = multipleConditionStorage.expired();
expired = multipleConditionStorage.expired(null);
assertThat(expired.size(), is(1));
}

View File

@@ -0,0 +1,6 @@
alter table multipleconditions add "tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId'));
DROP INDEX IF EXISTS multipleconditions_namespace__flow_id__condition_id;
DROP INDEX IF EXISTS multipleconditions_start_date__end_date;
CREATE INDEX IF NOT EXISTS multipleconditions_namespace__flow_id__condition_id ON multipleconditions ("tenant_id", "namespace", "flow_id", "condition_id");
CREATE INDEX IF NOT EXISTS multipleconditions_start_date__end_date ON multipleconditions ("tenant_id", "start_date", "end_date");

View File

@@ -0,0 +1,4 @@
alter table multipleconditions add `tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED;
DROP INDEX ix_namespace__flow_id__condition_id ON multipleconditions;
DROP INDEX ix_start_date__end_date ON multipleconditions;

View File

@@ -0,0 +1,6 @@
alter table multipleconditions add "tenant_id" VARCHAR(250) GENERATED ALWAYS AS (value ->> 'tenantId') STORED;
DROP INDEX IF EXISTS multipleconditions_namespace__flow_id__condition_id;
DROP INDEX IF EXISTS multipleconditions_start_date__end_date;
CREATE INDEX IF NOT EXISTS multipleconditions_namespace__flow_id__condition_id ON multipleconditions (tenant_id, namespace, flow_id, condition_id);
CREATE INDEX IF NOT EXISTS multipleconditions_start_date__end_date ON multipleconditions (tenant_id, start_date, end_date);

View File

@@ -33,6 +33,7 @@ public abstract class AbstractJdbcMultipleConditionStorage extends AbstractJdbcR
.from(this.jdbcRepository.getTable())
.where(
field("namespace").eq(flow.getNamespace())
.and(buildTenantCondition(flow.getTenantId()))
.and(field("flow_id").eq(flow.getId()))
.and(field("condition_id").eq(conditionId))
);
@@ -42,7 +43,7 @@ public abstract class AbstractJdbcMultipleConditionStorage extends AbstractJdbcR
}
@Override
public List<MultipleConditionWindow> expired() {
public List<MultipleConditionWindow> expired(String tenantId) {
ZonedDateTime now = ZonedDateTime.now();
return this.jdbcRepository
@@ -55,6 +56,7 @@ public abstract class AbstractJdbcMultipleConditionStorage extends AbstractJdbcR
.where(
field("start_date").lt(now.toOffsetDateTime())
.and(field("end_date").lt(now.toOffsetDateTime()))
.and(buildTenantCondition(tenantId))
);
return this.jdbcRepository.fetch(select);

View File

@@ -38,7 +38,7 @@ public class MemoryMultipleConditionStorage implements MultipleConditionStorageI
}
@Override
public List<MultipleConditionWindow> expired() {
public List<MultipleConditionWindow> expired(String tenantId) {
ZonedDateTime now = ZonedDateTime.now();
return map