mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 20:00:14 -05:00
Compare commits
6 Commits
dependabot
...
feat/remov
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0dc2cfa5b6 | ||
|
|
3432a77f28 | ||
|
|
7088652bd5 | ||
|
|
f16318c601 | ||
|
|
3731aaf5ab | ||
|
|
dc65ca0ae9 |
@@ -37,7 +37,7 @@ public class ExecutionKilledExecution extends ExecutionKilled implements TenantI
|
||||
public boolean isEqual(WorkerTask workerTask) {
|
||||
String taskTenantId = workerTask.getTaskRun().getTenantId();
|
||||
String taskExecutionId = workerTask.getTaskRun().getExecutionId();
|
||||
return (taskTenantId == null || taskTenantId.equals(this.tenantId)) && taskExecutionId.equals(this.executionId);
|
||||
return taskTenantId.equals(this.tenantId) && taskExecutionId.equals(this.executionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -34,7 +34,7 @@ public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
|
||||
public Optional<FlowInterface> findById(String tenantId, String namespace, String id, Optional<Integer> revision) {
|
||||
Optional<FlowInterface> find = this.allFlows
|
||||
.stream()
|
||||
.filter(flow -> ((flow.getTenantId() == null && tenantId == null) || Objects.equals(flow.getTenantId(), tenantId)) &&
|
||||
.filter(flow -> Objects.equals(flow.getTenantId(), tenantId) &&
|
||||
flow.getNamespace().equals(namespace) &&
|
||||
flow.getId().equals(id) &&
|
||||
(revision.isEmpty() || revision.get().equals(flow.getRevision()))
|
||||
|
||||
@@ -42,6 +42,9 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
InputStream getGlobalResource(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
@@ -57,6 +60,9 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
List<FileAttributes> listGlobalResources(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Whether the uri points to a file/object that exist in the internal storage.
|
||||
*
|
||||
@@ -73,6 +79,21 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the uri points to a file/object that exist in the global internal storage.
|
||||
*
|
||||
* @param uri the URI of the file/object in the internal storage.
|
||||
* @return true if the uri points to a file/object that exist in the internal storage.
|
||||
*/
|
||||
@SuppressWarnings("try")
|
||||
default boolean existsGlobalResource(@Nullable String namespace, URI uri) {
|
||||
try (InputStream ignored = getGlobalResource(namespace, uri)) {
|
||||
return true;
|
||||
} catch (IOException ieo) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
@@ -90,6 +111,9 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
@Retryable(includes = {IOException.class})
|
||||
URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class})
|
||||
URI createGlobalDirectory(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -37,9 +38,9 @@ class VariablesTest {
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void inStorage() {
|
||||
var storageContext = StorageContext.forTask(null, "namespace", "flow", "execution", "task", "taskRun", null);
|
||||
var storageContext = StorageContext.forTask(MAIN_TENANT, "namespace", "flow", "execution", "task", "taskRun", null);
|
||||
var internalStorage = new InternalStorage(storageContext, storageInterface);
|
||||
Variables.StorageContext variablesContext = new Variables.StorageContext(null, "namespace");
|
||||
Variables.StorageContext variablesContext = new Variables.StorageContext(MAIN_TENANT, "namespace");
|
||||
|
||||
// simple
|
||||
Map<String, Object> outputs = Map.of("key", "value");
|
||||
|
||||
@@ -20,6 +20,7 @@ import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -146,12 +147,12 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
|
||||
assertThat(expired.size()).isZero();
|
||||
|
||||
Thread.sleep(2005);
|
||||
|
||||
expired = multipleConditionStorage.expired(null);
|
||||
expired = multipleConditionStorage.expired(MAIN_TENANT);
|
||||
assertThat(expired.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -168,12 +169,12 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
|
||||
assertThat(expired.size()).isZero();
|
||||
|
||||
Thread.sleep(2005);
|
||||
|
||||
expired = multipleConditionStorage.expired(null);
|
||||
expired = multipleConditionStorage.expired(MAIN_TENANT);
|
||||
assertThat(expired.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -190,7 +191,7 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults()).isEmpty();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
|
||||
assertThat(expired.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -208,7 +209,7 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
|
||||
assertThat(expired.size()).isZero();
|
||||
}
|
||||
|
||||
@@ -225,7 +226,7 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(MAIN_TENANT);
|
||||
assertThat(expired.size()).isZero();
|
||||
}
|
||||
|
||||
@@ -247,6 +248,7 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
Flow flow = Flow.builder()
|
||||
.namespace(NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.id("multiple-flow")
|
||||
.revision(1)
|
||||
.triggers(Collections.singletonList(io.kestra.plugin.core.trigger.Flow.builder()
|
||||
|
||||
@@ -371,7 +371,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
protected void shouldFindByIdTestExecution() {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_TEST);
|
||||
|
||||
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_TEST.getId());
|
||||
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_TEST.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
full.ifPresent(current -> {
|
||||
@@ -388,7 +388,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
executionRepository.purge(ExecutionFixture.EXECUTION_1);
|
||||
|
||||
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
|
||||
full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
|
||||
assertThat(full.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
@@ -26,6 +25,7 @@ import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -39,9 +39,6 @@ public abstract class AbstractExecutionServiceTest {
|
||||
@Inject
|
||||
LogRepositoryInterface logRepository;
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@@ -56,12 +53,14 @@ public abstract class AbstractExecutionServiceTest {
|
||||
Flow flow = Flow.builder()
|
||||
.namespace("io.kestra.test")
|
||||
.id("abc")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.revision(1)
|
||||
.build();
|
||||
|
||||
Execution execution = Execution
|
||||
.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.state(state)
|
||||
.flowId(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
@@ -72,6 +71,7 @@ public abstract class AbstractExecutionServiceTest {
|
||||
|
||||
TaskRun taskRun = TaskRun
|
||||
.builder()
|
||||
.tenantId(MAIN_TENANT)
|
||||
.namespace(flow.getNamespace())
|
||||
.id(IdUtils.create())
|
||||
.executionId(execution.getId())
|
||||
@@ -108,7 +108,7 @@ public abstract class AbstractExecutionServiceTest {
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
null,
|
||||
@@ -126,7 +126,7 @@ public abstract class AbstractExecutionServiceTest {
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
null,
|
||||
|
||||
@@ -815,12 +815,14 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
@Test
|
||||
void findByExecutionNoRevision() {
|
||||
Flow flow = builder()
|
||||
.tenantId(MAIN_TENANT)
|
||||
.revision(3)
|
||||
.build();
|
||||
flowRepository.create(GenericFlow.of(flow));
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace(flow.getNamespace())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.flowId(flow.getId())
|
||||
.state(new State())
|
||||
.build();
|
||||
|
||||
@@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -23,14 +24,14 @@ public abstract class AbstractFlowTopologyRepositoryTest {
|
||||
.source(FlowNode.builder()
|
||||
.id(flowA)
|
||||
.namespace(namespace)
|
||||
.tenantId(TenantService.MAIN_TENANT)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.uid(flowA)
|
||||
.build()
|
||||
)
|
||||
.destination(FlowNode.builder()
|
||||
.id(flowB)
|
||||
.namespace(namespace)
|
||||
.tenantId(TenantService.MAIN_TENANT)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.uid(flowB)
|
||||
.build()
|
||||
)
|
||||
@@ -43,7 +44,7 @@ public abstract class AbstractFlowTopologyRepositoryTest {
|
||||
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
|
||||
);
|
||||
|
||||
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false);
|
||||
List<FlowTopology> list = flowTopologyRepository.findByFlow(MAIN_TENANT, "io.kestra.tests", "flow-a", false);
|
||||
|
||||
assertThat(list.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -35,17 +36,17 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
metricRepository.save(testCounter); // should only be retrieved by execution id
|
||||
metricRepository.save(timer);
|
||||
|
||||
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10));
|
||||
List<MetricEntry> results = metricRepository.findByExecutionId(MAIN_TENANT, executionId, Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
|
||||
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
|
||||
results = metricRepository.findByExecutionIdAndTaskId(MAIN_TENANT, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
|
||||
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10));
|
||||
results = metricRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, taskRun1.getId(), Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(2);
|
||||
|
||||
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
"namespace",
|
||||
"flow",
|
||||
null,
|
||||
@@ -59,7 +60,7 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
assertThat(aggregationResults.getGroupBy()).isEqualTo("day");
|
||||
|
||||
aggregationResults = metricRepository.aggregateByFlowId(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
"namespace",
|
||||
"flow",
|
||||
null,
|
||||
@@ -90,9 +91,9 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
metricRepository.save(test); // should only be retrieved by execution id
|
||||
|
||||
|
||||
List<String> flowMetricsNames = metricRepository.flowMetrics(null, "namespace", "flow");
|
||||
List<String> taskMetricsNames = metricRepository.taskMetrics(null, "namespace", "flow", "task");
|
||||
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(null, "namespace", "flow");
|
||||
List<String> flowMetricsNames = metricRepository.flowMetrics(MAIN_TENANT, "namespace", "flow");
|
||||
List<String> taskMetricsNames = metricRepository.taskMetrics(MAIN_TENANT, "namespace", "flow", "task");
|
||||
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(MAIN_TENANT, "namespace", "flow");
|
||||
|
||||
assertThat(flowMetricsNames.size()).isEqualTo(2);
|
||||
assertThat(taskMetricsNames.size()).isEqualTo(1);
|
||||
@@ -111,7 +112,7 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
metricRepository.save(timer);
|
||||
metricRepository.save(test); // should be retrieved as findAllAsync is used for backup
|
||||
|
||||
List<MetricEntry> results = metricRepository.findAllAsync(null).collectList().block();
|
||||
List<MetricEntry> results = metricRepository.findAllAsync(MAIN_TENANT).collectList().block();
|
||||
assertThat(results).hasSize(3);
|
||||
}
|
||||
|
||||
@@ -127,6 +128,7 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
return TaskRun.builder()
|
||||
.flowId("flow")
|
||||
.namespace("namespace")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.executionId(executionId)
|
||||
.taskId(taskId)
|
||||
.id(FriendlyId.createFriendlyId())
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -46,37 +47,38 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findById() {
|
||||
Template template = builder().build();
|
||||
Template template = buildTemplate();
|
||||
templateRepository.create(template);
|
||||
|
||||
Optional<Template> full = templateRepository.findById(null, template.getNamespace(), template.getId());
|
||||
Optional<Template> full = templateRepository.findById(MAIN_TENANT, template.getNamespace(), template.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getId()).isEqualTo(template.getId());
|
||||
|
||||
full = templateRepository.findById(null, template.getNamespace(), template.getId());
|
||||
full = templateRepository.findById(MAIN_TENANT, template.getNamespace(), template.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getId()).isEqualTo(template.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespace() {
|
||||
Template template1 = builder().build();
|
||||
Template template1 = buildTemplate();
|
||||
Template template2 = Template.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.namespace("kestra.test.template").build();
|
||||
|
||||
templateRepository.create(template1);
|
||||
templateRepository.create(template2);
|
||||
|
||||
List<Template> templates = templateRepository.findByNamespace(null, template1.getNamespace());
|
||||
List<Template> templates = templateRepository.findByNamespace(MAIN_TENANT, template1.getNamespace());
|
||||
assertThat(templates.size()).isGreaterThanOrEqualTo(1);
|
||||
templates = templateRepository.findByNamespace(null, template2.getNamespace());
|
||||
templates = templateRepository.findByNamespace(MAIN_TENANT, template2.getNamespace());
|
||||
assertThat(templates.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void save() {
|
||||
Template template = builder().build();
|
||||
Template template = buildTemplate();
|
||||
Template save = templateRepository.create(template);
|
||||
|
||||
assertThat(save.getId()).isEqualTo(template.getId());
|
||||
@@ -84,19 +86,19 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findAll() {
|
||||
long saveCount = templateRepository.findAll(null).size();
|
||||
Template template = builder().build();
|
||||
long saveCount = templateRepository.findAll(MAIN_TENANT).size();
|
||||
Template template = buildTemplate();
|
||||
templateRepository.create(template);
|
||||
long size = templateRepository.findAll(null).size();
|
||||
long size = templateRepository.findAll(MAIN_TENANT).size();
|
||||
assertThat(size).isGreaterThan(saveCount);
|
||||
templateRepository.delete(template);
|
||||
assertThat((long) templateRepository.findAll(null).size()).isEqualTo(saveCount);
|
||||
assertThat((long) templateRepository.findAll(MAIN_TENANT).size()).isEqualTo(saveCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllForAllTenants() {
|
||||
long saveCount = templateRepository.findAllForAllTenants().size();
|
||||
Template template = builder().build();
|
||||
Template template = buildTemplate();
|
||||
templateRepository.create(template);
|
||||
long size = templateRepository.findAllForAllTenants().size();
|
||||
assertThat(size).isGreaterThan(saveCount);
|
||||
@@ -106,19 +108,19 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
|
||||
@Test
|
||||
void find() {
|
||||
Template template1 = builder().build();
|
||||
Template template1 = buildTemplate();
|
||||
templateRepository.create(template1);
|
||||
Template template2 = builder().build();
|
||||
Template template2 = buildTemplate();
|
||||
templateRepository.create(template2);
|
||||
Template template3 = builder().build();
|
||||
Template template3 = buildTemplate();
|
||||
templateRepository.create(template3);
|
||||
|
||||
// with pageable
|
||||
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, null, "kestra.test");
|
||||
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, MAIN_TENANT, "kestra.test");
|
||||
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
|
||||
|
||||
// without pageable
|
||||
save = templateRepository.find(null, null, "kestra.test");
|
||||
save = templateRepository.find(null, MAIN_TENANT, "kestra.test");
|
||||
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
|
||||
|
||||
templateRepository.delete(template1);
|
||||
@@ -128,12 +130,12 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
|
||||
@Test
|
||||
void delete() {
|
||||
Template template = builder().build();
|
||||
Template template = buildTemplate();
|
||||
|
||||
Template save = templateRepository.create(template);
|
||||
templateRepository.delete(save);
|
||||
|
||||
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isFalse();
|
||||
assertThat(templateRepository.findById(MAIN_TENANT, template.getNamespace(), template.getId()).isPresent()).isFalse();
|
||||
|
||||
assertThat(TemplateListener.getEmits().size()).isEqualTo(2);
|
||||
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
@@ -157,4 +159,8 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
emits = new ArrayList<>();
|
||||
}
|
||||
}
|
||||
|
||||
private static Template buildTemplate() {
|
||||
return builder().tenantId(MAIN_TENANT).build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
private static Trigger.TriggerBuilder<?, ?> trigger() {
|
||||
return Trigger.builder()
|
||||
.flowId(IdUtils.create())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.triggerId(IdUtils.create())
|
||||
.executionId(IdUtils.create())
|
||||
@@ -138,7 +139,7 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
|
||||
assertThat(all.size()).isEqualTo(4);
|
||||
|
||||
all = triggerRepository.findAll(null);
|
||||
all = triggerRepository.findAll(MAIN_TENANT);
|
||||
|
||||
assertThat(all.size()).isEqualTo(4);
|
||||
|
||||
@@ -147,45 +148,46 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
Trigger trigger = trigger().namespace(namespace).build();
|
||||
triggerRepository.save(trigger);
|
||||
|
||||
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, null, null);
|
||||
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, MAIN_TENANT, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(4);
|
||||
assertThat(find.getFirst().getNamespace()).isEqualTo(namespace);
|
||||
|
||||
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, searchedTrigger.getFlowId(), null);
|
||||
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, MAIN_TENANT, null, searchedTrigger.getFlowId(), null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getFlowId()).isEqualTo(searchedTrigger.getFlowId());
|
||||
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, null, namespacePrefix, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, MAIN_TENANT, namespacePrefix, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
|
||||
|
||||
// Full text search is on namespace, flowId, triggerId, executionId
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), null, null, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), MAIN_TENANT, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), null, null, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), MAIN_TENANT, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), null, null, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), MAIN_TENANT, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), null, null, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), MAIN_TENANT, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCountForNullTenant() {
|
||||
void shouldCountForMainTenant() {
|
||||
// Given
|
||||
triggerRepository.save(Trigger
|
||||
.builder()
|
||||
.tenantId(MAIN_TENANT)
|
||||
.triggerId(IdUtils.create())
|
||||
.flowId(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.build()
|
||||
);
|
||||
// When
|
||||
int count = triggerRepository.count(null);
|
||||
int count = triggerRepository.count(MAIN_TENANT);
|
||||
// Then
|
||||
assertThat(count).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -64,6 +64,7 @@ class ExecutionFixture {
|
||||
public static final Execution EXECUTION_TEST = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.flowId("full")
|
||||
.flowRevision(1)
|
||||
.state(new State())
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
@@ -21,7 +23,6 @@ import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
@@ -200,7 +201,7 @@ class FlowInputOutputTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotUploadFileInputAfterValidation() throws IOException {
|
||||
void shouldNotUploadFileInputAfterValidation() {
|
||||
// Given
|
||||
FileInput input = FileInput
|
||||
.builder()
|
||||
@@ -215,7 +216,7 @@ class FlowInputOutputTest {
|
||||
|
||||
// Then
|
||||
Assertions.assertNull(values.getFirst().exception());
|
||||
Assertions.assertFalse(storageInterface.exists(null, null, URI.create(values.getFirst().value().toString())));
|
||||
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -23,7 +23,6 @@ import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.time.Duration;
|
||||
@@ -33,6 +32,7 @@ import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static io.kestra.core.utils.Rethrow.throwSupplier;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -109,6 +109,7 @@ class WorkerTest {
|
||||
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.namespace("io.kestra.unit-test")
|
||||
.tasks(Collections.singletonList(theWorkerTask))
|
||||
.build();
|
||||
@@ -132,7 +133,7 @@ class WorkerTest {
|
||||
}),
|
||||
() -> workerTaskResult.get() != null && workerTaskResult.get().getTaskRun().getState().isFailed(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofMinutes(1)
|
||||
Duration.ofSeconds(10L)
|
||||
);
|
||||
receive.blockLast();
|
||||
worker.shutdown();
|
||||
@@ -162,7 +163,7 @@ class WorkerTest {
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
executionKilledQueue.emit(ExecutionKilledExecution.builder().executionId(workerTask.getTaskRun().getExecutionId()).build());
|
||||
executionKilledQueue.emit(ExecutionKilledExecution.builder().tenantId(MAIN_TENANT).executionId(workerTask.getTaskRun().getExecutionId()).build());
|
||||
|
||||
Await.until(
|
||||
() -> {
|
||||
@@ -171,7 +172,7 @@ class WorkerTest {
|
||||
return copy.stream().filter(r -> r.getTaskRun().getState().isTerminated()).count() == 5;
|
||||
},
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofMinutes(1)
|
||||
Duration.ofSeconds(10L)
|
||||
);
|
||||
receiveWorkerTaskResults.blockLast();
|
||||
|
||||
@@ -211,6 +212,7 @@ class WorkerTest {
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unit-test")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(Collections.singletonList(bash))
|
||||
.build();
|
||||
|
||||
|
||||
@@ -1,39 +1,25 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.InternalKVStore;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
import io.kestra.core.storages.kv.KVValueAndMetadata;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import jakarta.inject.Inject;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
public class KvFunctionTest {
|
||||
@@ -46,20 +32,16 @@ public class KvFunctionTest {
|
||||
|
||||
@BeforeEach
|
||||
void reset() throws IOException {
|
||||
storageInterface.deleteByPrefix(null, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
|
||||
storageInterface.deleteByPrefix(MAIN_TENANT, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetValueFromKVGivenExistingKey() throws IllegalVariableEvaluationException, IOException {
|
||||
// Given
|
||||
KVStore kv = new InternalKVStore(null, "io.kestra.tests", storageInterface);
|
||||
KVStore kv = new InternalKVStore(MAIN_TENANT, "io.kestra.tests", storageInterface);
|
||||
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getFlow("io.kestra.tests");
|
||||
|
||||
// When
|
||||
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
|
||||
@@ -71,17 +53,13 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldGetValueFromKVGivenExistingKeyWithInheritance() throws IllegalVariableEvaluationException, IOException {
|
||||
// Given
|
||||
KVStore kv = new InternalKVStore(null, "my.company", storageInterface);
|
||||
KVStore kv = new InternalKVStore(MAIN_TENANT, "my.company", storageInterface);
|
||||
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
|
||||
|
||||
KVStore firstKv = new InternalKVStore(null, "my", storageInterface);
|
||||
KVStore firstKv = new InternalKVStore(MAIN_TENANT, "my", storageInterface);
|
||||
firstKv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "firstValue")));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "my.company.team")
|
||||
);
|
||||
Map<String, Object> variables = getFlow("my.company.team");
|
||||
|
||||
// When
|
||||
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
|
||||
@@ -93,14 +71,10 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldNotGetValueFromKVWithGivenNamespaceAndInheritance() throws IOException {
|
||||
// Given
|
||||
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
|
||||
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
|
||||
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "my.company.team")
|
||||
);
|
||||
Map<String, Object> variables = getFlow("my.company.team");
|
||||
|
||||
// When
|
||||
Assertions.assertThrows(IllegalVariableEvaluationException.class, () ->
|
||||
@@ -110,14 +84,10 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldGetValueFromKVGivenExistingAndNamespace() throws IllegalVariableEvaluationException, IOException {
|
||||
// Given
|
||||
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
|
||||
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
|
||||
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getFlow("io.kestra.tests");
|
||||
|
||||
// When
|
||||
String rendered = variableRenderer.render("{{ kv('my-key', namespace='kv') }}", variables);
|
||||
@@ -129,11 +99,7 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldGetEmptyGivenNonExistingKeyAndErrorOnMissingFalse() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getFlow("io.kestra.tests");
|
||||
|
||||
// When
|
||||
String rendered = variableRenderer.render("{{ kv('my-key', errorOnMissing=false) }}", variables);
|
||||
@@ -145,11 +111,7 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldFailGivenNonExistingKeyAndErrorOnMissingTrue() {
|
||||
// Given
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getFlow("io.kestra.tests");
|
||||
|
||||
// When
|
||||
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
|
||||
@@ -163,11 +125,7 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldFailGivenNonExistingKeyUsingDefaults() {
|
||||
// Given
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getFlow("io.kestra.tests");
|
||||
// When
|
||||
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
|
||||
variableRenderer.render("{{ kv('my-key') }}", variables);
|
||||
@@ -176,4 +134,13 @@ public class KvFunctionTest {
|
||||
// Then
|
||||
assertThat(exception.getMessage()).isEqualTo("io.pebbletemplates.pebble.error.PebbleException: The key 'my-key' does not exist in the namespace 'io.kestra.tests'. ({{ kv('my-key') }}:1)");
|
||||
}
|
||||
|
||||
private static @NotNull Map<String, Object> getFlow(String namespace) {
|
||||
return Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"tenantId", MAIN_TENANT,
|
||||
"namespace", namespace)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,7 +76,8 @@ class ReadFileFunctionTest {
|
||||
|
||||
@Test
|
||||
void readUnknownNamespaceFile() {
|
||||
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('unknown.txt') }}", Map.of("flow", Map.of("namespace", "io.kestra.tests"))));
|
||||
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () ->
|
||||
variableRenderer.render("{{ read('unknown.txt') }}", Map.of("flow", Map.of("tenantId", MAIN_TENANT, "namespace", "io.kestra.tests"))));
|
||||
assertThat(illegalVariableEvaluationException.getCause().getCause().getClass()).isEqualTo(FileNotFoundException.class);
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@@ -33,7 +34,7 @@ class FlowServiceTest {
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
private static FlowWithSource create(String flowId, String taskId, Integer revision) {
|
||||
return create(null, TEST_NAMESPACE, flowId, taskId, revision);
|
||||
return create(MAIN_TENANT, TEST_NAMESPACE, flowId, taskId, revision);
|
||||
}
|
||||
|
||||
private static FlowWithSource create(String tenantId, String namespace, String flowId, String taskId, Integer revision) {
|
||||
@@ -321,16 +322,16 @@ class FlowServiceTest {
|
||||
|
||||
@Test
|
||||
void findByNamespacePrefix() {
|
||||
FlowWithSource flow = create(null, "some.namespace","findByTest", "test", 1);
|
||||
FlowWithSource flow = create(MAIN_TENANT, "some.namespace","findByTest", "test", 1);
|
||||
flowRepository.create(GenericFlow.of(flow));
|
||||
assertThat(flowService.findByNamespacePrefix(null, "some.namespace").size()).isEqualTo(1);
|
||||
assertThat(flowService.findByNamespacePrefix(MAIN_TENANT, "some.namespace").size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findById() {
|
||||
FlowWithSource flow = create("findByIdTest", "test", 1);
|
||||
FlowWithSource saved = flowRepository.create(GenericFlow.of(flow));
|
||||
assertThat(flowService.findById(null, saved.getNamespace(), saved.getId()).isPresent()).isTrue();
|
||||
assertThat(flowService.findById(MAIN_TENANT, saved.getNamespace(), saved.getId()).isPresent()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -368,7 +369,7 @@ class FlowServiceTest {
|
||||
))
|
||||
.build();
|
||||
|
||||
List<String> exceptions = flowService.checkValidSubflows(flow, null);
|
||||
List<String> exceptions = flowService.checkValidSubflows(flow, MAIN_TENANT);
|
||||
|
||||
assertThat(exceptions.size()).isZero();
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.*;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
@@ -26,25 +27,25 @@ class KVStoreServiceTest {
|
||||
|
||||
@Test
|
||||
void shouldGetKVStoreForExistingNamespaceGivenFromNull() {
|
||||
Assertions.assertNotNull(storeService.get(null, TEST_EXISTING_NAMESPACE, null));
|
||||
Assertions.assertNotNull(storeService.get(MAIN_TENANT, TEST_EXISTING_NAMESPACE, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowExceptionWhenAccessingKVStoreForNonExistingNamespace() {
|
||||
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(null, "io.kestra.unittest.unknown", null));
|
||||
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(MAIN_TENANT, "io.kestra.unittest.unknown", null));
|
||||
Assertions.assertTrue(exception.getMessage().contains("namespace 'io.kestra.unittest.unknown' does not exist"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetKVStoreForAnyNamespaceWhenAccessingFromChildNamespace() {
|
||||
Assertions.assertNotNull(storeService.get(null, "io.kestra", TEST_EXISTING_NAMESPACE));
|
||||
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "io.kestra", TEST_EXISTING_NAMESPACE));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetKVStoreFromNonExistingNamespaceWithAKV() throws IOException {
|
||||
KVStore kvStore = new InternalKVStore(null, "system", storageInterface);
|
||||
KVStore kvStore = new InternalKVStore(MAIN_TENANT, "system", storageInterface);
|
||||
kvStore.put("key", new KVValueAndMetadata(new KVMetadata("myDescription", Duration.ofHours(1)), "value"));
|
||||
Assertions.assertNotNull(storeService.get(null, "system", null));
|
||||
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "system", null));
|
||||
}
|
||||
|
||||
@MockBean(NamespaceService.class)
|
||||
|
||||
@@ -27,6 +27,7 @@ import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.within;
|
||||
|
||||
@@ -91,7 +92,7 @@ class InternalKVStoreTest {
|
||||
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(description, Duration.ofMinutes(5)), complexValue));
|
||||
|
||||
// Then
|
||||
StorageObject withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
|
||||
StorageObject withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
|
||||
String valueFile = new String(withMetadata.inputStream().readAllBytes());
|
||||
Instant expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
|
||||
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(4))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(6)))).isTrue();
|
||||
@@ -102,7 +103,7 @@ class InternalKVStoreTest {
|
||||
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(null, Duration.ofMinutes(10)), "some-value"));
|
||||
|
||||
// Then
|
||||
withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
|
||||
withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
|
||||
valueFile = new String(withMetadata.inputStream().readAllBytes());
|
||||
expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
|
||||
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(9))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(11)))).isTrue();
|
||||
@@ -176,6 +177,6 @@ class InternalKVStoreTest {
|
||||
|
||||
private InternalKVStore kv() {
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
return new InternalKVStore(null, namespaceId, storageInterface);
|
||||
return new InternalKVStore(MAIN_TENANT, namespaceId, storageInterface);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,8 +17,8 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
class InternalNamespaceTest {
|
||||
|
||||
@@ -38,7 +38,7 @@ class InternalNamespaceTest {
|
||||
void shouldGetAllNamespaceFiles() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
// When
|
||||
namespace.putFile(Path.of("/sub/dir/file1.txt"), new ByteArrayInputStream("1".getBytes()));
|
||||
@@ -56,7 +56,7 @@ class InternalNamespaceTest {
|
||||
void shouldPutFileGivenNoTenant() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
// When
|
||||
NamespaceFile namespaceFile = namespace.putFile(Path.of("/sub/dir/file.txt"), new ByteArrayInputStream("1".getBytes()));
|
||||
@@ -73,7 +73,7 @@ class InternalNamespaceTest {
|
||||
void shouldSucceedPutFileGivenExistingFileForConflictOverwrite() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
|
||||
|
||||
@@ -92,7 +92,7 @@ class InternalNamespaceTest {
|
||||
void shouldFailPutFileGivenExistingFileForError() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
|
||||
|
||||
@@ -109,7 +109,7 @@ class InternalNamespaceTest {
|
||||
void shouldIgnorePutFileGivenExistingFileForSkip() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
|
||||
|
||||
@@ -128,7 +128,7 @@ class InternalNamespaceTest {
|
||||
void shouldFindAllMatchingGivenNoTenant() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
// When
|
||||
namespace.putFile(Path.of("/a/b/c/1.sql"), new ByteArrayInputStream("1".getBytes()));
|
||||
@@ -171,7 +171,7 @@ class InternalNamespaceTest {
|
||||
void shouldReturnNoNamespaceFileForEmptyNamespace() throws IOException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
List<NamespaceFile> namespaceFiles = namespace.findAllFilesMatching((unused) -> true);
|
||||
assertThat(namespaceFiles.size()).isZero();
|
||||
}
|
||||
|
||||
@@ -12,8 +12,8 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -31,6 +31,7 @@ class PurgeExecutionsTest {
|
||||
String flowId = "run-flow-id";
|
||||
var execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.namespace(namespace)
|
||||
.flowId(flowId)
|
||||
.state(new State().withState(State.Type.SUCCESS))
|
||||
@@ -42,7 +43,7 @@ class PurgeExecutionsTest {
|
||||
.namespace(Property.ofValue(namespace))
|
||||
.endDate(Property.ofValue(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
|
||||
.build();
|
||||
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
|
||||
var runContext = runContextFactory.of(flowId, namespace);
|
||||
var output = purge.run(runContext);
|
||||
|
||||
assertThat(output.getExecutionsCount()).isEqualTo(1);
|
||||
@@ -57,6 +58,7 @@ class PurgeExecutionsTest {
|
||||
var execution = Execution.builder()
|
||||
.namespace(namespace)
|
||||
.flowId(flowId)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.id(IdUtils.create())
|
||||
.state(new State().withState(State.Type.SUCCESS))
|
||||
.build();
|
||||
@@ -68,7 +70,7 @@ class PurgeExecutionsTest {
|
||||
.flowId(Property.ofValue(flowId))
|
||||
.endDate(Property.ofValue(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
|
||||
.build();
|
||||
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
|
||||
var runContext = runContextFactory.of(flowId, namespace);
|
||||
var output = purge.run(runContext);
|
||||
|
||||
assertThat(output.getExecutionsCount()).isEqualTo(1);
|
||||
|
||||
@@ -25,6 +25,7 @@ import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
@@ -47,6 +48,7 @@ class TimeoutTest {
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.revision(1)
|
||||
.tasks(Collections.singletonList(Sleep.builder()
|
||||
.id("test")
|
||||
|
||||
@@ -32,7 +32,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
public class WorkingDirectoryTest {
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.plugin.core.kv;
|
||||
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
import io.kestra.core.storages.kv.KVValueAndMetadata;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
@@ -14,6 +14,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -21,19 +22,18 @@ class DeleteTest {
|
||||
static final String TEST_KV_KEY = "test-key";
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
TestRunContextFactory runContextFactory;
|
||||
|
||||
@Test
|
||||
void shouldOutputTrueGivenExistingKey() throws Exception {
|
||||
// Given
|
||||
String namespaceId = "io.kestra." + IdUtils.create();
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", namespaceId),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(
|
||||
namespaceId, Map.of(
|
||||
"key", TEST_KV_KEY,
|
||||
"namespace", namespaceId
|
||||
)
|
||||
));
|
||||
);
|
||||
|
||||
Delete delete = Delete.builder()
|
||||
.id(Delete.class.getSimpleName())
|
||||
@@ -56,13 +56,12 @@ class DeleteTest {
|
||||
void shouldOutputFalseGivenNonExistingKey() throws Exception {
|
||||
// Given
|
||||
String namespaceId = "io.kestra." + IdUtils.create();
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", namespaceId),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(namespaceId,
|
||||
Map.of(
|
||||
"key", TEST_KV_KEY,
|
||||
"namespace", namespaceId
|
||||
)
|
||||
));
|
||||
);
|
||||
|
||||
Delete delete = Delete.builder()
|
||||
.id(Delete.class.getSimpleName())
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.plugin.core.kv;
|
||||
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
import io.kestra.core.storages.kv.KVValueAndMetadata;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
@@ -12,6 +12,7 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -19,15 +20,13 @@ class GetKeysTest {
|
||||
static final String TEST_KEY_PREFIX_TEST = "test";
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
TestRunContextFactory runContextFactory;
|
||||
|
||||
@Test
|
||||
void shouldGetAllKeys() throws Exception {
|
||||
// Given
|
||||
String namespace = IdUtils.create();
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", namespace)
|
||||
));
|
||||
RunContext runContext = this.runContextFactory.of(namespace);
|
||||
|
||||
GetKeys getKeys = GetKeys.builder()
|
||||
.id(GetKeys.class.getSimpleName())
|
||||
@@ -50,12 +49,9 @@ class GetKeysTest {
|
||||
void shouldGetKeysGivenMatchingPrefix() throws Exception {
|
||||
// Given
|
||||
String namespace = IdUtils.create();
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", namespace),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(namespace, Map.of(
|
||||
"prefix", TEST_KEY_PREFIX_TEST
|
||||
)
|
||||
));
|
||||
));
|
||||
|
||||
GetKeys getKeys = GetKeys.builder()
|
||||
.id(GetKeys.class.getSimpleName())
|
||||
@@ -79,11 +75,8 @@ class GetKeysTest {
|
||||
void shouldGetNoKeysGivenEmptyKeyStore() throws Exception {
|
||||
// Given
|
||||
String namespace = IdUtils.create();
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", namespace),
|
||||
"inputs", Map.of(
|
||||
"prefix", TEST_KEY_PREFIX_TEST
|
||||
)
|
||||
RunContext runContext = this.runContextFactory.withInputs(namespace, Map.of(
|
||||
"prefix", TEST_KEY_PREFIX_TEST
|
||||
));
|
||||
|
||||
GetKeys getKeys = GetKeys.builder()
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.plugin.core.kv;
|
||||
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
import io.kestra.core.storages.kv.KVValueAndMetadata;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
@@ -24,19 +24,17 @@ class GetTest {
|
||||
static final String TEST_KV_KEY = "test-key";
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
TestRunContextFactory runContextFactory;
|
||||
|
||||
@Test
|
||||
void shouldGetGivenExistingKey() throws Exception {
|
||||
// Given
|
||||
String namespaceId = "io.kestra." + IdUtils.create();
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", namespaceId),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(namespaceId,
|
||||
Map.of(
|
||||
"key", TEST_KV_KEY,
|
||||
"namespace", namespaceId
|
||||
)
|
||||
));
|
||||
));
|
||||
|
||||
var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string");
|
||||
|
||||
@@ -62,12 +60,10 @@ class GetTest {
|
||||
void shouldGetGivenExistingKeyWithInheritance() throws Exception {
|
||||
// Given
|
||||
String namespaceId = "io.kestra." + IdUtils.create();
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", namespaceId),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(namespaceId,
|
||||
Map.of(
|
||||
"key", TEST_KV_KEY
|
||||
)
|
||||
));
|
||||
));
|
||||
|
||||
var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string");
|
||||
|
||||
@@ -92,13 +88,11 @@ class GetTest {
|
||||
void shouldGetGivenNonExistingKey() throws Exception {
|
||||
// Given
|
||||
String namespaceId = "io.kestra." + IdUtils.create();
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", namespaceId),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(namespaceId,
|
||||
Map.of(
|
||||
"key", TEST_KV_KEY,
|
||||
"namespace", namespaceId
|
||||
)
|
||||
));
|
||||
));
|
||||
|
||||
Get get = Get.builder()
|
||||
.id(Get.class.getSimpleName())
|
||||
|
||||
@@ -1,37 +1,34 @@
|
||||
package io.kestra.plugin.core.kv;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.kv.KVType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
import io.kestra.core.storages.kv.KVStoreException;
|
||||
import io.kestra.core.storages.kv.KVValue;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@KestraTest
|
||||
class SetTest {
|
||||
|
||||
static final String TEST_KEY = "test-key";
|
||||
public static final String NAMESPACE = "io.kestra.test";
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
TestRunContextFactory runContextFactory;
|
||||
|
||||
@Test
|
||||
void shouldSetKVGivenNoNamespace() throws Exception {
|
||||
@@ -65,27 +62,25 @@ class SetTest {
|
||||
@Test
|
||||
void shouldSetKVGivenSameNamespace() throws Exception {
|
||||
// Given
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.test"),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(NAMESPACE,
|
||||
Map.of(
|
||||
"key", TEST_KEY,
|
||||
"value", "test-value"
|
||||
)
|
||||
));
|
||||
));
|
||||
|
||||
Set set = Set.builder()
|
||||
.id(Set.class.getSimpleName())
|
||||
.type(Set.class.getName())
|
||||
.key(new Property<>("{{ inputs.key }}"))
|
||||
.value(new Property<>("{{ inputs.value }}"))
|
||||
.namespace(new Property<>("io.kestra.test"))
|
||||
.namespace(new Property<>(NAMESPACE))
|
||||
.build();
|
||||
|
||||
// When
|
||||
set.run(runContext);
|
||||
|
||||
// Then
|
||||
final KVStore kv = runContext.namespaceKv("io.kestra.test");
|
||||
final KVStore kv = runContext.namespaceKv(NAMESPACE);
|
||||
assertThat(kv.getValue(TEST_KEY)).isEqualTo(Optional.of(new KVValue("test-value")));
|
||||
assertThat(kv.list().getFirst().expirationDate()).isNull();
|
||||
}
|
||||
@@ -93,13 +88,11 @@ class SetTest {
|
||||
@Test
|
||||
void shouldSetKVGivenChildNamespace() throws Exception {
|
||||
// Given
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.test"),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(NAMESPACE,
|
||||
Map.of(
|
||||
"key", TEST_KEY,
|
||||
"value", "test-value"
|
||||
)
|
||||
));
|
||||
));
|
||||
|
||||
Set set = Set.builder()
|
||||
.id(Set.class.getSimpleName())
|
||||
@@ -120,13 +113,11 @@ class SetTest {
|
||||
@Test
|
||||
void shouldFailGivenNonExistingNamespace() {
|
||||
// Given
|
||||
RunContext runContext = this.runContextFactory.of(Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.test"),
|
||||
"inputs", Map.of(
|
||||
RunContext runContext = this.runContextFactory.withInputs(NAMESPACE,
|
||||
Map.of(
|
||||
"key", TEST_KEY,
|
||||
"value", "test-value"
|
||||
)
|
||||
));
|
||||
));
|
||||
|
||||
Set set = Set.builder()
|
||||
.id(Set.class.getSimpleName())
|
||||
|
||||
@@ -3,12 +3,12 @@ package io.kestra.plugin.core.storage;
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -22,6 +22,7 @@ class PurgeCurrentExecutionFilesTest {
|
||||
var flow = Flow.builder()
|
||||
.namespace("namespace")
|
||||
.id("flowId")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.build();
|
||||
var runContext = runContextFactory.of(flow, Map.of(
|
||||
"execution", Map.of("id", "executionId"),
|
||||
|
||||
@@ -163,6 +163,6 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
|
||||
}
|
||||
|
||||
protected Condition buildTenantCondition(String prefix, String tenantId) {
|
||||
return tenantId == null ? field(prefix + "_tenant_id").isNull() : field(prefix + "_tenant_id").eq(tenantId);
|
||||
return field(prefix + "_tenant_id").eq(tenantId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ public abstract class AbstractJdbcRepository {
|
||||
}
|
||||
|
||||
protected Condition buildTenantCondition(String tenantId) {
|
||||
return tenantId == null ? field("tenant_id").isNull() : field("tenant_id").eq(tenantId);
|
||||
return field("tenant_id").eq(tenantId);
|
||||
}
|
||||
|
||||
public static Field<Object> field(String name) {
|
||||
|
||||
@@ -11,6 +11,7 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlowTopologyRepositoryTest {
|
||||
@@ -25,6 +26,7 @@ public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlo
|
||||
FlowWithSource flow = FlowWithSource.builder()
|
||||
.id("flow-a")
|
||||
.namespace("io.kestra.tests")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.revision(1)
|
||||
.build();
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.repositories.AbstractTemplateRepositoryTest {
|
||||
@@ -23,13 +24,13 @@ public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.
|
||||
templateRepository.create(builder("io.kestra.unitest").build());
|
||||
templateRepository.create(builder("com.kestra.test").build());
|
||||
|
||||
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, null, null);
|
||||
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, MAIN_TENANT, null);
|
||||
assertThat(save.size()).isEqualTo(2);
|
||||
|
||||
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", null, "com");
|
||||
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", MAIN_TENANT, "com");
|
||||
assertThat(save.size()).isEqualTo(1);
|
||||
|
||||
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", null, null);
|
||||
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", MAIN_TENANT, null);
|
||||
assertThat(save.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ public class MemoryRepositoryTest {
|
||||
|
||||
@Test
|
||||
void verifyMemoryFallbacksToH2() {
|
||||
assertThat(flowRepositoryInterface.findAll(null).size()).isZero();
|
||||
assertThat(flowRepositoryInterface.findAll(MAIN_TENANT).size()).isZero();
|
||||
|
||||
String flowSource = """
|
||||
id: some-flow
|
||||
|
||||
@@ -50,8 +50,11 @@ public class LocalStorage implements StorageInterface {
|
||||
}
|
||||
|
||||
protected Path getLocalPath(String tenantId, URI uri) {
|
||||
Path basePath = tenantId == null ? this.basePath.toAbsolutePath()
|
||||
: Paths.get(this.basePath.toAbsolutePath().toString(), tenantId);
|
||||
Path basePath = Paths.get(this.basePath.toAbsolutePath().toString(), tenantId);
|
||||
return getLocalPath(uri, basePath);
|
||||
}
|
||||
|
||||
private Path getLocalPath(URI uri, Path basePath) {
|
||||
if(uri == null) {
|
||||
return basePath;
|
||||
}
|
||||
@@ -68,6 +71,14 @@ public class LocalStorage implements StorageInterface {
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getGlobalResource(@Nullable String namespace, URI uri) throws IOException {
|
||||
return new BufferedInputStream(new FileInputStream(getLocalPath(uri, basePath)
|
||||
.toAbsolutePath()
|
||||
.toString())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException {
|
||||
return new StorageObject(LocalFileAttributes.getMetadata(this.getLocalPath(tenantId, uri)), this.get(tenantId, namespace, uri));
|
||||
@@ -122,9 +133,10 @@ public class LocalStorage implements StorageInterface {
|
||||
|
||||
@Override
|
||||
public List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException {
|
||||
try (Stream<Path> stream = Files.list(getLocalPath(tenantId, uri))) {
|
||||
Path path = getLocalPath(tenantId, uri);
|
||||
try (Stream<Path> stream = Files.list(path)) {
|
||||
return stream
|
||||
.filter(path -> !path.getFileName().toString().endsWith(".metadata"))
|
||||
.filter(file -> !file.getFileName().toString().endsWith(".metadata"))
|
||||
.map(throwFunction(file -> {
|
||||
URI relative = URI.create(
|
||||
getLocalPath(tenantId, null).relativize(
|
||||
@@ -139,6 +151,12 @@ public class LocalStorage implements StorageInterface {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FileAttributes> listGlobalResource(@Nullable String namespace, URI uri) throws IOException {
|
||||
Path path = getLocalPath(tenantId, uri);
|
||||
return listResources(tenantId, namespace, path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException {
|
||||
File file = getLocalPath(tenantId, uri).toFile();
|
||||
@@ -180,10 +198,26 @@ public class LocalStorage implements StorageInterface {
|
||||
|
||||
@Override
|
||||
public URI createDirectory(String tenantId, @Nullable String namespace, URI uri) {
|
||||
validateDirectoryUri(uri);
|
||||
File file = getLocalPath(tenantId, uri).toFile();
|
||||
return createDirectory(uri, file);
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI createGlobalDirectory(@Nullable String namespace, URI uri) {
|
||||
validateDirectoryUri(uri);
|
||||
File file = getLocalPath(uri, basePath).toFile();
|
||||
return createDirectory(uri, file);
|
||||
}
|
||||
|
||||
private static void validateDirectoryUri(URI uri) {
|
||||
if (uri == null || uri.getPath().isEmpty()) {
|
||||
throw new IllegalArgumentException("Unable to create a directory with empty url.");
|
||||
}
|
||||
File file = getLocalPath(tenantId, uri).toFile();
|
||||
}
|
||||
|
||||
|
||||
private static URI createDirectory(URI uri, File file) {
|
||||
if (!file.exists() && !file.mkdirs()) {
|
||||
throw new RuntimeException("Cannot create directory: " + file.getAbsolutePath());
|
||||
}
|
||||
@@ -235,9 +269,7 @@ public class LocalStorage implements StorageInterface {
|
||||
}
|
||||
|
||||
private URI getKestraUri(String tenantId, Path path) {
|
||||
Path prefix = (tenantId == null) ?
|
||||
basePath.toAbsolutePath():
|
||||
basePath.toAbsolutePath().resolve(tenantId);
|
||||
Path prefix = basePath.toAbsolutePath().resolve(tenantId);
|
||||
subPathParentGuard(path, prefix);
|
||||
return URI.create("kestra:///" + prefix.relativize(path).toString().replace("\\", "/"));
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.context;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import jakarta.inject.Singleton;
|
||||
@@ -11,8 +10,23 @@ import java.util.Map;
|
||||
@Singleton
|
||||
public class TestRunContextFactory extends RunContextFactory {
|
||||
|
||||
@VisibleForTesting
|
||||
public RunContext of() {
|
||||
return of(Map.of("flow", Map.of("id", "id", "namespace", "namespace", "tenantId", MAIN_TENANT)));
|
||||
return of("id", "namespace");
|
||||
}
|
||||
|
||||
public RunContext of(String namespace) {
|
||||
return of("id", namespace);
|
||||
}
|
||||
|
||||
public RunContext of(String flowId, String namespace) {
|
||||
return of(Map.of("flow", Map.of("id", flowId, "namespace", namespace, "tenantId", MAIN_TENANT)));
|
||||
}
|
||||
|
||||
public RunContext withInputs(String namespace, Map<String, String > inputs) {
|
||||
return of(Map.of(
|
||||
"flow", Map.of("id", "id", "namespace", namespace, "tenantId", MAIN_TENANT),
|
||||
"inputs", inputs
|
||||
));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user