Compare commits

...

6 Commits

Author SHA1 Message Date
nKwiatkowski
0dc2cfa5b6 wip(core): #4345 add storage global method to access storage without tenant id 2025-07-16 11:15:52 +02:00
nKwiatkowski
3432a77f28 Merge branch 'develop' into feat/remove_tenant_id_null
# Conflicts:
#	core/src/test/java/io/kestra/core/repositories/AbstractFlowTopologyRepositoryTest.java
#	jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcFlowTopologyRepositoryTest.java
2025-07-15 16:02:32 +02:00
nKwiatkowski
7088652bd5 fix(core): failing unit tests 2025-07-15 14:31:42 +02:00
nKwiatkowski
f16318c601 fix(core): failing unit tests 2025-07-15 11:01:11 +02:00
nKwiatkowski
3731aaf5ab fix(core): failing unit tests 2025-07-11 18:01:33 +02:00
nKwiatkowski
dc65ca0ae9 feat(core): #4345 remove tenantId == null from code 2025-07-11 16:21:56 +02:00
36 changed files with 284 additions and 239 deletions

View File

@@ -37,7 +37,7 @@ public class ExecutionKilledExecution extends ExecutionKilled implements TenantI
public boolean isEqual(WorkerTask workerTask) {
String taskTenantId = workerTask.getTaskRun().getTenantId();
String taskExecutionId = workerTask.getTaskRun().getExecutionId();
return (taskTenantId == null || taskTenantId.equals(this.tenantId)) && taskExecutionId.equals(this.executionId);
return taskTenantId.equals(this.tenantId) && taskExecutionId.equals(this.executionId);
}
@Override

View File

@@ -34,7 +34,7 @@ public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
public Optional<FlowInterface> findById(String tenantId, String namespace, String id, Optional<Integer> revision) {
Optional<FlowInterface> find = this.allFlows
.stream()
.filter(flow -> ((flow.getTenantId() == null && tenantId == null) || Objects.equals(flow.getTenantId(), tenantId)) &&
.filter(flow -> Objects.equals(flow.getTenantId(), tenantId) &&
flow.getNamespace().equals(namespace) &&
flow.getId().equals(id) &&
(revision.isEmpty() || revision.get().equals(flow.getRevision()))

View File

@@ -42,6 +42,9 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream getGlobalResource(@Nullable String namespace, URI uri) throws IOException;
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException;
@@ -57,6 +60,9 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> listGlobalResources(@Nullable String namespace, URI uri) throws IOException;
/**
* Whether the uri points to a file/object that exist in the internal storage.
*
@@ -73,6 +79,21 @@ public interface StorageInterface extends AutoCloseable, Plugin {
}
}
/**
* Whether the uri points to a file/object that exist in the global internal storage.
*
* @param uri the URI of the file/object in the internal storage.
* @return true if the uri points to a file/object that exist in the internal storage.
*/
@SuppressWarnings("try")
default boolean existsGlobalResource(@Nullable String namespace, URI uri) {
try (InputStream ignored = getGlobalResource(namespace, uri)) {
return true;
} catch (IOException ieo) {
return false;
}
}
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
@@ -90,6 +111,9 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class})
URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException;
@Retryable(includes = {IOException.class})
URI createGlobalDirectory(@Nullable String namespace, URI uri) throws IOException;
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;

View File

@@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test;
import java.net.URI;
import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -37,9 +38,9 @@ class VariablesTest {
@Test
@SuppressWarnings("unchecked")
void inStorage() {
var storageContext = StorageContext.forTask(null, "namespace", "flow", "execution", "task", "taskRun", null);
var storageContext = StorageContext.forTask(MAIN_TENANT, "namespace", "flow", "execution", "task", "taskRun", null);
var internalStorage = new InternalStorage(storageContext, storageInterface);
Variables.StorageContext variablesContext = new Variables.StorageContext(null, "namespace");
Variables.StorageContext variablesContext = new Variables.StorageContext(MAIN_TENANT, "namespace");
// simple
Map<String, Object> outputs = Map.of("key", "value");

View File

@@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -146,12 +147,12 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
assertThat(expired.size()).isZero();
Thread.sleep(2005);
expired = multipleConditionStorage.expired(null);
expired = multipleConditionStorage.expired(MAIN_TENANT);
assertThat(expired.size()).isEqualTo(1);
}
@@ -168,12 +169,12 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
assertThat(expired.size()).isZero();
Thread.sleep(2005);
expired = multipleConditionStorage.expired(null);
expired = multipleConditionStorage.expired(MAIN_TENANT);
assertThat(expired.size()).isEqualTo(1);
}
@@ -190,7 +191,7 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults()).isEmpty();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
assertThat(expired.size()).isEqualTo(1);
}
@@ -208,7 +209,7 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
assertThat(expired.size()).isZero();
}
@@ -225,7 +226,7 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
assertThat(expired.size()).isZero();
}
@@ -247,6 +248,7 @@ public abstract class AbstractMultipleConditionStorageTest {
Flow flow = Flow.builder()
.namespace(NAMESPACE)
.tenantId(MAIN_TENANT)
.id("multiple-flow")
.revision(1)
.triggers(Collections.singletonList(io.kestra.plugin.core.trigger.Flow.builder()

View File

@@ -371,7 +371,7 @@ public abstract class AbstractExecutionRepositoryTest {
protected void shouldFindByIdTestExecution() {
executionRepository.save(ExecutionFixture.EXECUTION_TEST);
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_TEST.getId());
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_TEST.getId());
assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> {
@@ -388,7 +388,7 @@ public abstract class AbstractExecutionRepositoryTest {
executionRepository.purge(ExecutionFixture.EXECUTION_1);
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent()).isFalse();
}

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.junit.annotations.KestraTest;
@@ -26,6 +25,7 @@ import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -39,9 +39,6 @@ public abstract class AbstractExecutionServiceTest {
@Inject
LogRepositoryInterface logRepository;
@Inject
StorageInterface storageInterface;
@Inject
RunContextFactory runContextFactory;
@@ -56,12 +53,14 @@ public abstract class AbstractExecutionServiceTest {
Flow flow = Flow.builder()
.namespace("io.kestra.test")
.id("abc")
.tenantId(MAIN_TENANT)
.revision(1)
.build();
Execution execution = Execution
.builder()
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.state(state)
.flowId(flow.getId())
.namespace(flow.getNamespace())
@@ -72,6 +71,7 @@ public abstract class AbstractExecutionServiceTest {
TaskRun taskRun = TaskRun
.builder()
.tenantId(MAIN_TENANT)
.namespace(flow.getNamespace())
.id(IdUtils.create())
.executionId(execution.getId())
@@ -108,7 +108,7 @@ public abstract class AbstractExecutionServiceTest {
true,
true,
true,
null,
MAIN_TENANT,
flow.getNamespace(),
flow.getId(),
null,
@@ -126,7 +126,7 @@ public abstract class AbstractExecutionServiceTest {
true,
true,
true,
null,
MAIN_TENANT,
flow.getNamespace(),
flow.getId(),
null,

View File

@@ -815,12 +815,14 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void findByExecutionNoRevision() {
Flow flow = builder()
.tenantId(MAIN_TENANT)
.revision(3)
.build();
flowRepository.create(GenericFlow.of(flow));
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
.tenantId(MAIN_TENANT)
.flowId(flow.getId())
.state(new State())
.build();

View File

@@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -23,14 +24,14 @@ public abstract class AbstractFlowTopologyRepositoryTest {
.source(FlowNode.builder()
.id(flowA)
.namespace(namespace)
.tenantId(TenantService.MAIN_TENANT)
.tenantId(MAIN_TENANT)
.uid(flowA)
.build()
)
.destination(FlowNode.builder()
.id(flowB)
.namespace(namespace)
.tenantId(TenantService.MAIN_TENANT)
.tenantId(MAIN_TENANT)
.uid(flowB)
.build()
)
@@ -43,7 +44,7 @@ public abstract class AbstractFlowTopologyRepositoryTest {
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
);
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false);
List<FlowTopology> list = flowTopologyRepository.findByFlow(MAIN_TENANT, "io.kestra.tests", "flow-a", false);
assertThat(list.size()).isEqualTo(1);
}

View File

@@ -16,6 +16,7 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -35,17 +36,17 @@ public abstract class AbstractMetricRepositoryTest {
metricRepository.save(testCounter); // should only be retrieved by execution id
metricRepository.save(timer);
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10));
List<MetricEntry> results = metricRepository.findByExecutionId(MAIN_TENANT, executionId, Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
results = metricRepository.findByExecutionIdAndTaskId(MAIN_TENANT, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10));
results = metricRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, taskRun1.getId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(2);
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
null,
MAIN_TENANT,
"namespace",
"flow",
null,
@@ -59,7 +60,7 @@ public abstract class AbstractMetricRepositoryTest {
assertThat(aggregationResults.getGroupBy()).isEqualTo("day");
aggregationResults = metricRepository.aggregateByFlowId(
null,
MAIN_TENANT,
"namespace",
"flow",
null,
@@ -90,9 +91,9 @@ public abstract class AbstractMetricRepositoryTest {
metricRepository.save(test); // should only be retrieved by execution id
List<String> flowMetricsNames = metricRepository.flowMetrics(null, "namespace", "flow");
List<String> taskMetricsNames = metricRepository.taskMetrics(null, "namespace", "flow", "task");
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(null, "namespace", "flow");
List<String> flowMetricsNames = metricRepository.flowMetrics(MAIN_TENANT, "namespace", "flow");
List<String> taskMetricsNames = metricRepository.taskMetrics(MAIN_TENANT, "namespace", "flow", "task");
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(MAIN_TENANT, "namespace", "flow");
assertThat(flowMetricsNames.size()).isEqualTo(2);
assertThat(taskMetricsNames.size()).isEqualTo(1);
@@ -111,7 +112,7 @@ public abstract class AbstractMetricRepositoryTest {
metricRepository.save(timer);
metricRepository.save(test); // should be retrieved as findAllAsync is used for backup
List<MetricEntry> results = metricRepository.findAllAsync(null).collectList().block();
List<MetricEntry> results = metricRepository.findAllAsync(MAIN_TENANT).collectList().block();
assertThat(results).hasSize(3);
}
@@ -127,6 +128,7 @@ public abstract class AbstractMetricRepositoryTest {
return TaskRun.builder()
.flowId("flow")
.namespace("namespace")
.tenantId(MAIN_TENANT)
.executionId(executionId)
.taskId(taskId)
.id(FriendlyId.createFriendlyId())

View File

@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -46,37 +47,38 @@ public abstract class AbstractTemplateRepositoryTest {
@Test
void findById() {
Template template = builder().build();
Template template = buildTemplate();
templateRepository.create(template);
Optional<Template> full = templateRepository.findById(null, template.getNamespace(), template.getId());
Optional<Template> full = templateRepository.findById(MAIN_TENANT, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.get().getId()).isEqualTo(template.getId());
full = templateRepository.findById(null, template.getNamespace(), template.getId());
full = templateRepository.findById(MAIN_TENANT, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.get().getId()).isEqualTo(template.getId());
}
@Test
void findByNamespace() {
Template template1 = builder().build();
Template template1 = buildTemplate();
Template template2 = Template.builder()
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.namespace("kestra.test.template").build();
templateRepository.create(template1);
templateRepository.create(template2);
List<Template> templates = templateRepository.findByNamespace(null, template1.getNamespace());
List<Template> templates = templateRepository.findByNamespace(MAIN_TENANT, template1.getNamespace());
assertThat(templates.size()).isGreaterThanOrEqualTo(1);
templates = templateRepository.findByNamespace(null, template2.getNamespace());
templates = templateRepository.findByNamespace(MAIN_TENANT, template2.getNamespace());
assertThat(templates.size()).isEqualTo(1);
}
@Test
void save() {
Template template = builder().build();
Template template = buildTemplate();
Template save = templateRepository.create(template);
assertThat(save.getId()).isEqualTo(template.getId());
@@ -84,19 +86,19 @@ public abstract class AbstractTemplateRepositoryTest {
@Test
void findAll() {
long saveCount = templateRepository.findAll(null).size();
Template template = builder().build();
long saveCount = templateRepository.findAll(MAIN_TENANT).size();
Template template = buildTemplate();
templateRepository.create(template);
long size = templateRepository.findAll(null).size();
long size = templateRepository.findAll(MAIN_TENANT).size();
assertThat(size).isGreaterThan(saveCount);
templateRepository.delete(template);
assertThat((long) templateRepository.findAll(null).size()).isEqualTo(saveCount);
assertThat((long) templateRepository.findAll(MAIN_TENANT).size()).isEqualTo(saveCount);
}
@Test
void findAllForAllTenants() {
long saveCount = templateRepository.findAllForAllTenants().size();
Template template = builder().build();
Template template = buildTemplate();
templateRepository.create(template);
long size = templateRepository.findAllForAllTenants().size();
assertThat(size).isGreaterThan(saveCount);
@@ -106,19 +108,19 @@ public abstract class AbstractTemplateRepositoryTest {
@Test
void find() {
Template template1 = builder().build();
Template template1 = buildTemplate();
templateRepository.create(template1);
Template template2 = builder().build();
Template template2 = buildTemplate();
templateRepository.create(template2);
Template template3 = builder().build();
Template template3 = buildTemplate();
templateRepository.create(template3);
// with pageable
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, null, "kestra.test");
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, MAIN_TENANT, "kestra.test");
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
// without pageable
save = templateRepository.find(null, null, "kestra.test");
save = templateRepository.find(null, MAIN_TENANT, "kestra.test");
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
templateRepository.delete(template1);
@@ -128,12 +130,12 @@ public abstract class AbstractTemplateRepositoryTest {
@Test
void delete() {
Template template = builder().build();
Template template = buildTemplate();
Template save = templateRepository.create(template);
templateRepository.delete(save);
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isFalse();
assertThat(templateRepository.findById(MAIN_TENANT, template.getNamespace(), template.getId()).isPresent()).isFalse();
assertThat(TemplateListener.getEmits().size()).isEqualTo(2);
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
@@ -157,4 +159,8 @@ public abstract class AbstractTemplateRepositoryTest {
emits = new ArrayList<>();
}
}
private static Template buildTemplate() {
return builder().tenantId(MAIN_TENANT).build();
}
}

View File

@@ -38,6 +38,7 @@ public abstract class AbstractTriggerRepositoryTest {
private static Trigger.TriggerBuilder<?, ?> trigger() {
return Trigger.builder()
.flowId(IdUtils.create())
.tenantId(MAIN_TENANT)
.namespace(TEST_NAMESPACE)
.triggerId(IdUtils.create())
.executionId(IdUtils.create())
@@ -138,7 +139,7 @@ public abstract class AbstractTriggerRepositoryTest {
assertThat(all.size()).isEqualTo(4);
all = triggerRepository.findAll(null);
all = triggerRepository.findAll(MAIN_TENANT);
assertThat(all.size()).isEqualTo(4);
@@ -147,45 +148,46 @@ public abstract class AbstractTriggerRepositoryTest {
Trigger trigger = trigger().namespace(namespace).build();
triggerRepository.save(trigger);
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, null, null);
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, MAIN_TENANT, null, null, null);
assertThat(find.size()).isEqualTo(4);
assertThat(find.getFirst().getNamespace()).isEqualTo(namespace);
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, searchedTrigger.getFlowId(), null);
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, MAIN_TENANT, null, searchedTrigger.getFlowId(), null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getFlowId()).isEqualTo(searchedTrigger.getFlowId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, null, namespacePrefix, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, MAIN_TENANT, namespacePrefix, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
// Full text search is on namespace, flowId, triggerId, executionId
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), null, null, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), MAIN_TENANT, null, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), null, null, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), MAIN_TENANT, null, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), null, null, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), MAIN_TENANT, null, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), null, null, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), MAIN_TENANT, null, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
}
@Test
void shouldCountForNullTenant() {
void shouldCountForMainTenant() {
// Given
triggerRepository.save(Trigger
.builder()
.tenantId(MAIN_TENANT)
.triggerId(IdUtils.create())
.flowId(IdUtils.create())
.namespace("io.kestra.unittest")
.build()
);
// When
int count = triggerRepository.count(null);
int count = triggerRepository.count(MAIN_TENANT);
// Then
assertThat(count).isEqualTo(1);
}

View File

@@ -64,6 +64,7 @@ class ExecutionFixture {
public static final Execution EXECUTION_TEST = Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT)
.flowId("full")
.flowRevision(1)
.state(new State())

View File

@@ -1,5 +1,7 @@
package io.kestra.core.runners;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.DependsOn;
@@ -21,7 +23,6 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -200,7 +201,7 @@ class FlowInputOutputTest {
}
@Test
void shouldNotUploadFileInputAfterValidation() throws IOException {
void shouldNotUploadFileInputAfterValidation() {
// Given
FileInput input = FileInput
.builder()
@@ -215,7 +216,7 @@ class FlowInputOutputTest {
// Then
Assertions.assertNull(values.getFirst().exception());
Assertions.assertFalse(storageInterface.exists(null, null, URI.create(values.getFirst().value().toString())));
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
}
@Test

View File

@@ -23,7 +23,6 @@ import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import reactor.core.publisher.Flux;
import java.time.Duration;
@@ -33,6 +32,7 @@ import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwSupplier;
import static org.assertj.core.api.Assertions.assertThat;
@@ -109,6 +109,7 @@ class WorkerTest {
Flow flow = Flow.builder()
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.namespace("io.kestra.unit-test")
.tasks(Collections.singletonList(theWorkerTask))
.build();
@@ -132,7 +133,7 @@ class WorkerTest {
}),
() -> workerTaskResult.get() != null && workerTaskResult.get().getTaskRun().getState().isFailed(),
Duration.ofMillis(100),
Duration.ofMinutes(1)
Duration.ofSeconds(10L)
);
receive.blockLast();
worker.shutdown();
@@ -162,7 +163,7 @@ class WorkerTest {
Thread.sleep(500);
executionKilledQueue.emit(ExecutionKilledExecution.builder().executionId(workerTask.getTaskRun().getExecutionId()).build());
executionKilledQueue.emit(ExecutionKilledExecution.builder().tenantId(MAIN_TENANT).executionId(workerTask.getTaskRun().getExecutionId()).build());
Await.until(
() -> {
@@ -171,7 +172,7 @@ class WorkerTest {
return copy.stream().filter(r -> r.getTaskRun().getState().isTerminated()).count() == 5;
},
Duration.ofMillis(100),
Duration.ofMinutes(1)
Duration.ofSeconds(10L)
);
receiveWorkerTaskResults.blockLast();
@@ -211,6 +212,7 @@ class WorkerTest {
Flow flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.unit-test")
.tenantId(MAIN_TENANT)
.tasks(Collections.singletonList(bash))
.build();

View File

@@ -1,39 +1,25 @@
package io.kestra.core.runners.pebble.functions;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import io.pebbletemplates.pebble.error.PebbleException;
import jakarta.inject.Inject;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
@KestraTest(startRunner = true)
public class KvFunctionTest {
@@ -46,20 +32,16 @@ public class KvFunctionTest {
@BeforeEach
void reset() throws IOException {
storageInterface.deleteByPrefix(null, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
storageInterface.deleteByPrefix(MAIN_TENANT, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
}
@Test
void shouldGetValueFromKVGivenExistingKey() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "io.kestra.tests", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "io.kestra.tests", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getFlow("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -71,17 +53,13 @@ public class KvFunctionTest {
@Test
void shouldGetValueFromKVGivenExistingKeyWithInheritance() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "my.company", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "my.company", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
KVStore firstKv = new InternalKVStore(null, "my", storageInterface);
KVStore firstKv = new InternalKVStore(MAIN_TENANT, "my", storageInterface);
firstKv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "firstValue")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "my.company.team")
);
Map<String, Object> variables = getFlow("my.company.team");
// When
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -93,14 +71,10 @@ public class KvFunctionTest {
@Test
void shouldNotGetValueFromKVWithGivenNamespaceAndInheritance() throws IOException {
// Given
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "my.company.team")
);
Map<String, Object> variables = getFlow("my.company.team");
// When
Assertions.assertThrows(IllegalVariableEvaluationException.class, () ->
@@ -110,14 +84,10 @@ public class KvFunctionTest {
@Test
void shouldGetValueFromKVGivenExistingAndNamespace() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getFlow("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key', namespace='kv') }}", variables);
@@ -129,11 +99,7 @@ public class KvFunctionTest {
@Test
void shouldGetEmptyGivenNonExistingKeyAndErrorOnMissingFalse() throws IllegalVariableEvaluationException {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getFlow("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key', errorOnMissing=false) }}", variables);
@@ -145,11 +111,7 @@ public class KvFunctionTest {
@Test
void shouldFailGivenNonExistingKeyAndErrorOnMissingTrue() {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getFlow("io.kestra.tests");
// When
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
@@ -163,11 +125,7 @@ public class KvFunctionTest {
@Test
void shouldFailGivenNonExistingKeyUsingDefaults() {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getFlow("io.kestra.tests");
// When
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -176,4 +134,13 @@ public class KvFunctionTest {
// Then
assertThat(exception.getMessage()).isEqualTo("io.pebbletemplates.pebble.error.PebbleException: The key 'my-key' does not exist in the namespace 'io.kestra.tests'. ({{ kv('my-key') }}:1)");
}
private static @NotNull Map<String, Object> getFlow(String namespace) {
return Map.of(
"flow", Map.of(
"id", "kv",
"tenantId", MAIN_TENANT,
"namespace", namespace)
);
}
}

View File

@@ -76,7 +76,8 @@ class ReadFileFunctionTest {
@Test
void readUnknownNamespaceFile() {
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('unknown.txt') }}", Map.of("flow", Map.of("namespace", "io.kestra.tests"))));
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () ->
variableRenderer.render("{{ read('unknown.txt') }}", Map.of("flow", Map.of("tenantId", MAIN_TENANT, "namespace", "io.kestra.tests"))));
assertThat(illegalVariableEvaluationException.getCause().getCause().getClass()).isEqualTo(FileNotFoundException.class);
}

View File

@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -33,7 +34,7 @@ class FlowServiceTest {
private FlowRepositoryInterface flowRepository;
private static FlowWithSource create(String flowId, String taskId, Integer revision) {
return create(null, TEST_NAMESPACE, flowId, taskId, revision);
return create(MAIN_TENANT, TEST_NAMESPACE, flowId, taskId, revision);
}
private static FlowWithSource create(String tenantId, String namespace, String flowId, String taskId, Integer revision) {
@@ -321,16 +322,16 @@ class FlowServiceTest {
@Test
void findByNamespacePrefix() {
FlowWithSource flow = create(null, "some.namespace","findByTest", "test", 1);
FlowWithSource flow = create(MAIN_TENANT, "some.namespace","findByTest", "test", 1);
flowRepository.create(GenericFlow.of(flow));
assertThat(flowService.findByNamespacePrefix(null, "some.namespace").size()).isEqualTo(1);
assertThat(flowService.findByNamespacePrefix(MAIN_TENANT, "some.namespace").size()).isEqualTo(1);
}
@Test
void findById() {
FlowWithSource flow = create("findByIdTest", "test", 1);
FlowWithSource saved = flowRepository.create(GenericFlow.of(flow));
assertThat(flowService.findById(null, saved.getNamespace(), saved.getId()).isPresent()).isTrue();
assertThat(flowService.findById(MAIN_TENANT, saved.getNamespace(), saved.getId()).isPresent()).isTrue();
}
@Test
@@ -368,7 +369,7 @@ class FlowServiceTest {
))
.build();
List<String> exceptions = flowService.checkValidSubflows(flow, null);
List<String> exceptions = flowService.checkValidSubflows(flow, MAIN_TENANT);
assertThat(exceptions.size()).isZero();
}

View File

@@ -1,7 +1,8 @@
package io.kestra.core.services;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.*;
import io.micronaut.test.annotation.MockBean;
@@ -26,25 +27,25 @@ class KVStoreServiceTest {
@Test
void shouldGetKVStoreForExistingNamespaceGivenFromNull() {
Assertions.assertNotNull(storeService.get(null, TEST_EXISTING_NAMESPACE, null));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, TEST_EXISTING_NAMESPACE, null));
}
@Test
void shouldThrowExceptionWhenAccessingKVStoreForNonExistingNamespace() {
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(null, "io.kestra.unittest.unknown", null));
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(MAIN_TENANT, "io.kestra.unittest.unknown", null));
Assertions.assertTrue(exception.getMessage().contains("namespace 'io.kestra.unittest.unknown' does not exist"));
}
@Test
void shouldGetKVStoreForAnyNamespaceWhenAccessingFromChildNamespace() {
Assertions.assertNotNull(storeService.get(null, "io.kestra", TEST_EXISTING_NAMESPACE));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "io.kestra", TEST_EXISTING_NAMESPACE));
}
@Test
void shouldGetKVStoreFromNonExistingNamespaceWithAKV() throws IOException {
KVStore kvStore = new InternalKVStore(null, "system", storageInterface);
KVStore kvStore = new InternalKVStore(MAIN_TENANT, "system", storageInterface);
kvStore.put("key", new KVValueAndMetadata(new KVMetadata("myDescription", Duration.ofHours(1)), "value"));
Assertions.assertNotNull(storeService.get(null, "system", null));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "system", null));
}
@MockBean(NamespaceService.class)

View File

@@ -27,6 +27,7 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
@@ -91,7 +92,7 @@ class InternalKVStoreTest {
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(description, Duration.ofMinutes(5)), complexValue));
// Then
StorageObject withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
StorageObject withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
String valueFile = new String(withMetadata.inputStream().readAllBytes());
Instant expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(4))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(6)))).isTrue();
@@ -102,7 +103,7 @@ class InternalKVStoreTest {
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(null, Duration.ofMinutes(10)), "some-value"));
// Then
withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
valueFile = new String(withMetadata.inputStream().readAllBytes());
expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(9))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(11)))).isTrue();
@@ -176,6 +177,6 @@ class InternalKVStoreTest {
private InternalKVStore kv() {
final String namespaceId = "io.kestra." + IdUtils.create();
return new InternalKVStore(null, namespaceId, storageInterface);
return new InternalKVStore(MAIN_TENANT, namespaceId, storageInterface);
}
}

View File

@@ -17,8 +17,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
class InternalNamespaceTest {
@@ -38,7 +38,7 @@ class InternalNamespaceTest {
void shouldGetAllNamespaceFiles() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
namespace.putFile(Path.of("/sub/dir/file1.txt"), new ByteArrayInputStream("1".getBytes()));
@@ -56,7 +56,7 @@ class InternalNamespaceTest {
void shouldPutFileGivenNoTenant() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
NamespaceFile namespaceFile = namespace.putFile(Path.of("/sub/dir/file.txt"), new ByteArrayInputStream("1".getBytes()));
@@ -73,7 +73,7 @@ class InternalNamespaceTest {
void shouldSucceedPutFileGivenExistingFileForConflictOverwrite() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -92,7 +92,7 @@ class InternalNamespaceTest {
void shouldFailPutFileGivenExistingFileForError() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -109,7 +109,7 @@ class InternalNamespaceTest {
void shouldIgnorePutFileGivenExistingFileForSkip() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -128,7 +128,7 @@ class InternalNamespaceTest {
void shouldFindAllMatchingGivenNoTenant() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
namespace.putFile(Path.of("/a/b/c/1.sql"), new ByteArrayInputStream("1".getBytes()));
@@ -171,7 +171,7 @@ class InternalNamespaceTest {
void shouldReturnNoNamespaceFileForEmptyNamespace() throws IOException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
List<NamespaceFile> namespaceFiles = namespace.findAllFilesMatching((unused) -> true);
assertThat(namespaceFiles.size()).isZero();
}

View File

@@ -12,8 +12,8 @@ import org.junit.jupiter.api.Test;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -31,6 +31,7 @@ class PurgeExecutionsTest {
String flowId = "run-flow-id";
var execution = Execution.builder()
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.namespace(namespace)
.flowId(flowId)
.state(new State().withState(State.Type.SUCCESS))
@@ -42,7 +43,7 @@ class PurgeExecutionsTest {
.namespace(Property.ofValue(namespace))
.endDate(Property.ofValue(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
.build();
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
var runContext = runContextFactory.of(flowId, namespace);
var output = purge.run(runContext);
assertThat(output.getExecutionsCount()).isEqualTo(1);
@@ -57,6 +58,7 @@ class PurgeExecutionsTest {
var execution = Execution.builder()
.namespace(namespace)
.flowId(flowId)
.tenantId(MAIN_TENANT)
.id(IdUtils.create())
.state(new State().withState(State.Type.SUCCESS))
.build();
@@ -68,7 +70,7 @@ class PurgeExecutionsTest {
.flowId(Property.ofValue(flowId))
.endDate(Property.ofValue(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
.build();
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
var runContext = runContextFactory.of(flowId, namespace);
var output = purge.run(runContext);
assertThat(output.getExecutionsCount()).isEqualTo(1);

View File

@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest(startRunner = true)
@@ -47,6 +48,7 @@ class TimeoutTest {
Flow flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT)
.revision(1)
.tasks(Collections.singletonList(Sleep.builder()
.id("test")

View File

@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
@KestraTest(startRunner = true)
public class WorkingDirectoryTest {

View File

@@ -1,9 +1,9 @@
package io.kestra.plugin.core.kv;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.IdUtils;
@@ -14,6 +14,7 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.NoSuchElementException;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -21,19 +22,18 @@ class DeleteTest {
static final String TEST_KV_KEY = "test-key";
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Test
void shouldOutputTrueGivenExistingKey() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(
namespaceId, Map.of(
"key", TEST_KV_KEY,
"namespace", namespaceId
)
));
);
Delete delete = Delete.builder()
.id(Delete.class.getSimpleName())
@@ -56,13 +56,12 @@ class DeleteTest {
void shouldOutputFalseGivenNonExistingKey() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(namespaceId,
Map.of(
"key", TEST_KV_KEY,
"namespace", namespaceId
)
));
);
Delete delete = Delete.builder()
.id(Delete.class.getSimpleName())

View File

@@ -1,9 +1,9 @@
package io.kestra.plugin.core.kv;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.IdUtils;
@@ -12,6 +12,7 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -19,15 +20,13 @@ class GetKeysTest {
static final String TEST_KEY_PREFIX_TEST = "test";
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Test
void shouldGetAllKeys() throws Exception {
// Given
String namespace = IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespace)
));
RunContext runContext = this.runContextFactory.of(namespace);
GetKeys getKeys = GetKeys.builder()
.id(GetKeys.class.getSimpleName())
@@ -50,12 +49,9 @@ class GetKeysTest {
void shouldGetKeysGivenMatchingPrefix() throws Exception {
// Given
String namespace = IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespace),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(namespace, Map.of(
"prefix", TEST_KEY_PREFIX_TEST
)
));
));
GetKeys getKeys = GetKeys.builder()
.id(GetKeys.class.getSimpleName())
@@ -79,11 +75,8 @@ class GetKeysTest {
void shouldGetNoKeysGivenEmptyKeyStore() throws Exception {
// Given
String namespace = IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespace),
"inputs", Map.of(
"prefix", TEST_KEY_PREFIX_TEST
)
RunContext runContext = this.runContextFactory.withInputs(namespace, Map.of(
"prefix", TEST_KEY_PREFIX_TEST
));
GetKeys getKeys = GetKeys.builder()

View File

@@ -1,9 +1,9 @@
package io.kestra.plugin.core.kv;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.IdUtils;
@@ -24,19 +24,17 @@ class GetTest {
static final String TEST_KV_KEY = "test-key";
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Test
void shouldGetGivenExistingKey() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(namespaceId,
Map.of(
"key", TEST_KV_KEY,
"namespace", namespaceId
)
));
));
var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string");
@@ -62,12 +60,10 @@ class GetTest {
void shouldGetGivenExistingKeyWithInheritance() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(namespaceId,
Map.of(
"key", TEST_KV_KEY
)
));
));
var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string");
@@ -92,13 +88,11 @@ class GetTest {
void shouldGetGivenNonExistingKey() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(namespaceId,
Map.of(
"key", TEST_KV_KEY,
"namespace", namespaceId
)
));
));
Get get = Get.builder()
.id(Get.class.getSimpleName())

View File

@@ -1,37 +1,34 @@
package io.kestra.plugin.core.kv;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.kv.KVType;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVStoreException;
import io.kestra.core.storages.kv.KVValue;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@KestraTest
class SetTest {
static final String TEST_KEY = "test-key";
public static final String NAMESPACE = "io.kestra.test";
@Inject
StorageInterface storageInterface;
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Test
void shouldSetKVGivenNoNamespace() throws Exception {
@@ -65,27 +62,25 @@ class SetTest {
@Test
void shouldSetKVGivenSameNamespace() throws Exception {
// Given
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", "io.kestra.test"),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(NAMESPACE,
Map.of(
"key", TEST_KEY,
"value", "test-value"
)
));
));
Set set = Set.builder()
.id(Set.class.getSimpleName())
.type(Set.class.getName())
.key(new Property<>("{{ inputs.key }}"))
.value(new Property<>("{{ inputs.value }}"))
.namespace(new Property<>("io.kestra.test"))
.namespace(new Property<>(NAMESPACE))
.build();
// When
set.run(runContext);
// Then
final KVStore kv = runContext.namespaceKv("io.kestra.test");
final KVStore kv = runContext.namespaceKv(NAMESPACE);
assertThat(kv.getValue(TEST_KEY)).isEqualTo(Optional.of(new KVValue("test-value")));
assertThat(kv.list().getFirst().expirationDate()).isNull();
}
@@ -93,13 +88,11 @@ class SetTest {
@Test
void shouldSetKVGivenChildNamespace() throws Exception {
// Given
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", "io.kestra.test"),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(NAMESPACE,
Map.of(
"key", TEST_KEY,
"value", "test-value"
)
));
));
Set set = Set.builder()
.id(Set.class.getSimpleName())
@@ -120,13 +113,11 @@ class SetTest {
@Test
void shouldFailGivenNonExistingNamespace() {
// Given
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", "io.kestra.test"),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.withInputs(NAMESPACE,
Map.of(
"key", TEST_KEY,
"value", "test-value"
)
));
));
Set set = Set.builder()
.id(Set.class.getSimpleName())

View File

@@ -3,12 +3,12 @@ package io.kestra.plugin.core.storage;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.runners.RunContextFactory;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -22,6 +22,7 @@ class PurgeCurrentExecutionFilesTest {
var flow = Flow.builder()
.namespace("namespace")
.id("flowId")
.tenantId(MAIN_TENANT)
.build();
var runContext = runContextFactory.of(flow, Map.of(
"execution", Map.of("id", "executionId"),

View File

@@ -163,6 +163,6 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
}
protected Condition buildTenantCondition(String prefix, String tenantId) {
return tenantId == null ? field(prefix + "_tenant_id").isNull() : field(prefix + "_tenant_id").eq(tenantId);
return field(prefix + "_tenant_id").eq(tenantId);
}
}

View File

@@ -70,7 +70,7 @@ public abstract class AbstractJdbcRepository {
}
protected Condition buildTenantCondition(String tenantId) {
return tenantId == null ? field("tenant_id").isNull() : field("tenant_id").eq(tenantId);
return field("tenant_id").eq(tenantId);
}
public static Field<Object> field(String name) {

View File

@@ -11,6 +11,7 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlowTopologyRepositoryTest {
@@ -25,6 +26,7 @@ public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlo
FlowWithSource flow = FlowWithSource.builder()
.id("flow-a")
.namespace("io.kestra.tests")
.tenantId(MAIN_TENANT)
.revision(1)
.build();

View File

@@ -12,6 +12,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.repositories.AbstractTemplateRepositoryTest {
@@ -23,13 +24,13 @@ public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.
templateRepository.create(builder("io.kestra.unitest").build());
templateRepository.create(builder("com.kestra.test").build());
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, null, null);
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, MAIN_TENANT, null);
assertThat(save.size()).isEqualTo(2);
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", null, "com");
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", MAIN_TENANT, "com");
assertThat(save.size()).isEqualTo(1);
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", null, null);
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", MAIN_TENANT, null);
assertThat(save.size()).isEqualTo(1);
}

View File

@@ -17,7 +17,7 @@ public class MemoryRepositoryTest {
@Test
void verifyMemoryFallbacksToH2() {
assertThat(flowRepositoryInterface.findAll(null).size()).isZero();
assertThat(flowRepositoryInterface.findAll(MAIN_TENANT).size()).isZero();
String flowSource = """
id: some-flow

View File

@@ -50,8 +50,11 @@ public class LocalStorage implements StorageInterface {
}
protected Path getLocalPath(String tenantId, URI uri) {
Path basePath = tenantId == null ? this.basePath.toAbsolutePath()
: Paths.get(this.basePath.toAbsolutePath().toString(), tenantId);
Path basePath = Paths.get(this.basePath.toAbsolutePath().toString(), tenantId);
return getLocalPath(uri, basePath);
}
private Path getLocalPath(URI uri, Path basePath) {
if(uri == null) {
return basePath;
}
@@ -68,6 +71,14 @@ public class LocalStorage implements StorageInterface {
);
}
@Override
public InputStream getGlobalResource(@Nullable String namespace, URI uri) throws IOException {
return new BufferedInputStream(new FileInputStream(getLocalPath(uri, basePath)
.toAbsolutePath()
.toString())
);
}
@Override
public StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException {
return new StorageObject(LocalFileAttributes.getMetadata(this.getLocalPath(tenantId, uri)), this.get(tenantId, namespace, uri));
@@ -122,9 +133,10 @@ public class LocalStorage implements StorageInterface {
@Override
public List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException {
try (Stream<Path> stream = Files.list(getLocalPath(tenantId, uri))) {
Path path = getLocalPath(tenantId, uri);
try (Stream<Path> stream = Files.list(path)) {
return stream
.filter(path -> !path.getFileName().toString().endsWith(".metadata"))
.filter(file -> !file.getFileName().toString().endsWith(".metadata"))
.map(throwFunction(file -> {
URI relative = URI.create(
getLocalPath(tenantId, null).relativize(
@@ -139,6 +151,12 @@ public class LocalStorage implements StorageInterface {
}
}
@Override
public List<FileAttributes> listGlobalResource(@Nullable String namespace, URI uri) throws IOException {
Path path = getLocalPath(tenantId, uri);
return listResources(tenantId, namespace, path);
}
@Override
public URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException {
File file = getLocalPath(tenantId, uri).toFile();
@@ -180,10 +198,26 @@ public class LocalStorage implements StorageInterface {
@Override
public URI createDirectory(String tenantId, @Nullable String namespace, URI uri) {
validateDirectoryUri(uri);
File file = getLocalPath(tenantId, uri).toFile();
return createDirectory(uri, file);
}
@Override
public URI createGlobalDirectory(@Nullable String namespace, URI uri) {
validateDirectoryUri(uri);
File file = getLocalPath(uri, basePath).toFile();
return createDirectory(uri, file);
}
private static void validateDirectoryUri(URI uri) {
if (uri == null || uri.getPath().isEmpty()) {
throw new IllegalArgumentException("Unable to create a directory with empty url.");
}
File file = getLocalPath(tenantId, uri).toFile();
}
private static URI createDirectory(URI uri, File file) {
if (!file.exists() && !file.mkdirs()) {
throw new RuntimeException("Cannot create directory: " + file.getAbsolutePath());
}
@@ -235,9 +269,7 @@ public class LocalStorage implements StorageInterface {
}
private URI getKestraUri(String tenantId, Path path) {
Path prefix = (tenantId == null) ?
basePath.toAbsolutePath():
basePath.toAbsolutePath().resolve(tenantId);
Path prefix = basePath.toAbsolutePath().resolve(tenantId);
subPathParentGuard(path, prefix);
return URI.create("kestra:///" + prefix.relativize(path).toString().replace("\\", "/"));
}

View File

@@ -2,7 +2,6 @@ package io.kestra.core.context;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import jakarta.inject.Singleton;
@@ -11,8 +10,23 @@ import java.util.Map;
@Singleton
public class TestRunContextFactory extends RunContextFactory {
@VisibleForTesting
public RunContext of() {
return of(Map.of("flow", Map.of("id", "id", "namespace", "namespace", "tenantId", MAIN_TENANT)));
return of("id", "namespace");
}
public RunContext of(String namespace) {
return of("id", namespace);
}
public RunContext of(String flowId, String namespace) {
return of(Map.of("flow", Map.of("id", flowId, "namespace", namespace, "tenantId", MAIN_TENANT)));
}
public RunContext withInputs(String namespace, Map<String, String > inputs) {
return of(Map.of(
"flow", Map.of("id", "id", "namespace", namespace, "tenantId", MAIN_TENANT),
"inputs", inputs
));
}
}