chore(refactor): remove a bunch of compilation warning and remove deprecated StorageInterface method

This commit is contained in:
Ludovic DEHON
2024-01-11 10:42:51 +01:00
parent b0a805e8e0
commit 1e1785db7e
22 changed files with 87 additions and 150 deletions

View File

@@ -311,7 +311,7 @@ subprojects {
}
jacoco {
toolVersion = "0.8.9"
toolVersion = "0.8.11"
}
jacocoTestReport {

View File

@@ -108,6 +108,7 @@ public final class ExecutableUtils {
.build();
}
@SuppressWarnings("unchecked")
public static TaskRun manageIterations(TaskRun taskRun, Execution execution, boolean transmitFailed, boolean allowFailure) throws InternalException {
Integer numberOfBatches = (Integer) taskRun.getOutputs().get("numberOfBatches");
var previousTaskRun = execution.findTaskRunByTaskRunId(taskRun.getId());

View File

@@ -382,6 +382,7 @@ public class RunContext {
return this;
}
@SuppressWarnings("unchecked")
public RunContext forWorker(ApplicationContext applicationContext, WorkerTask workerTask) {
this.initBean(applicationContext);
this.initLogger(workerTask.getTaskRun(), workerTask.getTask());
@@ -652,7 +653,7 @@ public class RunContext {
public Optional<Long> getTaskCacheFileLastModifiedTime(String namespace, String flowId, String taskId, String value) throws IOException {
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
return this.storageInterface.exists(getTenantId(), uri) ? Optional.of(this.storageInterface.lastModifiedTime(getTenantId(), uri)) : Optional.empty();
return this.storageInterface.exists(getTenantId(), uri) ? Optional.of(this.storageInterface.getAttributes(getTenantId(), uri).getLastModifiedTime()) : Optional.empty();
}
/**
@@ -845,7 +846,8 @@ public class RunContext {
}
}
private String getTenantId() {
@SuppressWarnings("unchecked")
public String getTenantId() {
Map<String, String> flow = (Map<String, String>) this.getVariables().get("flow");
// normally only tests should not have the flow variable
return flow != null ? flow.get("tenantId") : null;

View File

@@ -589,7 +589,6 @@ public class Worker implements Runnable, AutoCloseable {
));
}
@SuppressWarnings("ResultOfMethodCallIgnored")
@Override
public void close() throws Exception {
closeWorker(Duration.ofMinutes(5));

View File

@@ -44,6 +44,7 @@ public class ReadFileFunction implements Function {
}
}
@SuppressWarnings("unchecked")
private String readFromNamespaceFile(EvaluationContext context, String path) throws IOException {
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
URI namespaceFile = URI.create(storageInterface.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
@@ -52,6 +53,7 @@ public class ReadFileFunction implements Function {
}
}
@SuppressWarnings("unchecked")
private String readFromInternalStorageUri(EvaluationContext context, String path) throws IOException {
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");

View File

@@ -17,6 +17,7 @@ import java.util.Map;
* this deserializer allows using both types.
*/
public class ListOrMapOfLabelDeserializer extends JsonDeserializer<List<Label>> implements ResolvableDeserializer {
@SuppressWarnings("unchecked")
@Override
public List<Label> deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
if (p.hasToken(JsonToken.VALUE_NULL)) {

View File

@@ -49,6 +49,7 @@ public interface StorageInterface {
* @param tenantId the tenant identifier.
* @return true if the uri points to a file/object that exist in the internal storage.
*/
@SuppressWarnings("try")
default boolean exists(String tenantId, URI uri) {
try (InputStream ignored = get(tenantId, uri)){
return true;
@@ -57,20 +58,6 @@ public interface StorageInterface {
}
}
/**
* @deprecated Use {@link #getAttributes(String, URI)}} instead of individual call for every attribute
*/
@Deprecated
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
Long size(String tenantId, URI uri) throws IOException;
/**
* @deprecated Use {@link #getAttributes(String, URI)} instead of individual call for every attribute
*/
@Deprecated
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
Long lastModifiedTime(String tenantId, URI uri) throws IOException;
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getAttributes(String tenantId, URI uri) throws IOException;

View File

@@ -112,6 +112,7 @@ public class Counts extends Task implements RunnableTask<Counts.Output> {
@PluginProperty(dynamic = true)
protected String expression;
@SuppressWarnings("unchecked")
@Override
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();

View File

@@ -18,11 +18,7 @@ import io.kestra.core.runners.WorkerTask;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.validations.WorkingDirectoryTaskValidation;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.io.ByteArrayOutputStream;
@@ -204,6 +200,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
private NamespaceFiles namespaceFiles;
@Getter(AccessLevel.PRIVATE)
@Builder.Default
private transient long cacheDownloadedTime = 0L;
@Override

View File

@@ -1,8 +1,5 @@
package io.kestra.core.tasks.storages;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
@@ -10,9 +7,11 @@ import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.storages.StorageInterface;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.net.URI;
import java.util.Map;
import java.util.NoSuchElementException;
@SuperBuilder
@@ -52,7 +51,7 @@ public class Delete extends Task implements RunnableTask<Delete.Output> {
StorageInterface storageInterface = runContext.getApplicationContext().getBean(StorageInterface.class);
URI render = URI.create(runContext.render(this.uri));
boolean delete = storageInterface.delete(getTenantId(runContext), render);
boolean delete = storageInterface.delete(runContext.getTenantId(), render);
if (errorOnMissing && !delete) {
throw new NoSuchElementException("Unable to find file '" + render + "'");
@@ -64,12 +63,6 @@ public class Delete extends Task implements RunnableTask<Delete.Output> {
.build();
}
private String getTenantId(RunContext runContext) {
Map<String, String> flow = (Map<String, String>) runContext.getVariables().get("flow");
// normally only tests should not have the flow variable
return flow != null ? flow.get("tenantId") : null;
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {

View File

@@ -30,45 +30,45 @@ import java.util.Map;
full = true,
title = "Output local files created in a Python task and load them to S3",
code = """
id: outputsFromPythonTask
namespace: dev
id: outputsFromPythonTask
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/examples
branch: main
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/examples
branch: main
- id: gitPythonScripts
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
runner: DOCKER
docker:
image: ghcr.io/kestra-io/pydata:latest
beforeCommands:
- pip install faker > /dev/null
commands:
- python scripts/etl_script.py
- python scripts/generate_orders.py
- id: gitPythonScripts
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
runner: DOCKER
docker:
image: ghcr.io/kestra-io/pydata:latest
beforeCommands:
- pip install faker > /dev/null
commands:
- python scripts/etl_script.py
- python scripts/generate_orders.py
- id: outputFile
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- orders.csv
- "*.parquet"
- id: outputFile
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- orders.csv
- "*.parquet"
- id: loadCsvToS3
type: io.kestra.plugin.aws.s3.Upload
accessKeyId: "{{secret('AWS_ACCESS_KEY_ID')}}"
secretKeyId: "{{secret('AWS_SECRET_ACCESS_KEY')}}"
region: eu-central-1
bucket: kestraio
key: stage/orders.csv
from: "{{outputs.outputFile.uris['orders.csv']}}"
disabled: true
- id: loadCsvToS3
type: io.kestra.plugin.aws.s3.Upload
accessKeyId: "{{secret('AWS_ACCESS_KEY_ID')}}"
secretKeyId: "{{secret('AWS_SECRET_ACCESS_KEY')}}"
region: eu-central-1
bucket: kestraio
key: stage/orders.csv
from: "{{outputs.outputFile.uris['orders.csv']}}"
disabled: true
"""
),
@Example(

View File

@@ -96,6 +96,7 @@ public class Purge extends Task implements RunnableTask<Purge.Output> {
@Builder.Default
private boolean purgeStorage = true;
@SuppressWarnings("unchecked")
@Override
public Purge.Output run(RunContext runContext) throws Exception {
ExecutionService executionService = runContext.getApplicationContext().getBean(ExecutionService.class);

View File

@@ -12,7 +12,6 @@ import lombok.*;
import lombok.experimental.SuperBuilder;
import java.net.URI;
import java.util.Map;
@SuperBuilder
@ToString
@@ -44,19 +43,13 @@ public class Size extends Task implements RunnableTask<Size.Output> {
StorageInterface storageInterface = runContext.getApplicationContext().getBean(StorageInterface.class);
URI render = URI.create(runContext.render(this.uri));
Long size = storageInterface.size(getTenantId(runContext), render);
Long size = storageInterface.getAttributes(runContext.getTenantId(), render).getSize();
return Output.builder()
.size(size)
.build();
}
private String getTenantId(RunContext runContext) {
Map<String, String> flow = (Map<String, String>) runContext.getVariables().get("flow");
// normally only tests should not have the flow variable
return flow != null ? flow.get("tenantId") : null;
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {

View File

@@ -149,7 +149,7 @@ class RunContextTest extends AbstractMemoryRunnerTest {
p.destroy();
URI uri = runContext.putTempFile(path.toFile());
assertThat(storageInterface.size(null, uri), is(size + 1));
assertThat(storageInterface.getAttributes(null, uri).getSize(), is(size + 1));
}
@Test

View File

@@ -296,7 +296,7 @@ public abstract class StorageTestSuite {
}
private void exists(String prefix, String tenantId) throws Exception {
URI put = putFile(tenantId, "/" + prefix + "/storage/put.yml");
putFile(tenantId, "/" + prefix + "/storage/put.yml");
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/put.yml")), is(true));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/notfound.yml")), is(false));
}
@@ -368,7 +368,7 @@ public abstract class StorageTestSuite {
private void size(String prefix, String tenantId) throws Exception {
URI put = putFile(tenantId, "/" + prefix + "/storage/put.yml");
assertThat(storageInterface.size(tenantId, new URI("/" + prefix + "/storage/put.yml")), is((long) contentString.length()));
assertThat(storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/put.yml")).getSize(), is((long) contentString.length()));
}
@Test
@@ -384,7 +384,7 @@ public abstract class StorageTestSuite {
path.forEach(throwConsumer(s -> putFile(tenantId, s)));
assertThrows(IllegalArgumentException.class, () -> {
storageInterface.size(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml"));
storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml")).getSize();
});
}
@@ -393,7 +393,7 @@ public abstract class StorageTestSuite {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();
assertThrows(FileNotFoundException.class, () -> {
storageInterface.size(tenantId, new URI("/" + prefix + "/storage/"));
storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/")).getSize();
});
}
@@ -408,15 +408,15 @@ public abstract class StorageTestSuite {
putFile(null, nullTenant);
URI with = new URI(withTenant);
assertThat(storageInterface.size(tenantId, with), is((long) contentString.length()));
assertThat(storageInterface.getAttributes(tenantId, with).getSize(), is((long) contentString.length()));
assertThrows(FileNotFoundException.class, () -> {
storageInterface.size(null, with);
storageInterface.getAttributes(null, with).getSize();
});
URI without = new URI(nullTenant);
assertThat(storageInterface.size(null, without), is((long) contentString.length()));
assertThat(storageInterface.getAttributes(null, without).getSize(), is((long) contentString.length()));
assertThrows(FileNotFoundException.class, () -> {
storageInterface.size(tenantId, without);
storageInterface.getAttributes(tenantId, without).getSize();
});
}
@@ -427,7 +427,7 @@ public abstract class StorageTestSuite {
String tenantId = IdUtils.create();
putFile(tenantId, "/" + prefix + "/storage/get.yml");
assertThat(storageInterface.size(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")), is((long) contentString.length()));
assertThat(storageInterface.getAttributes(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")).getSize(), is((long) contentString.length()));
}
//endregion
@@ -449,8 +449,8 @@ public abstract class StorageTestSuite {
}
private void lastModifiedTime(String prefix, String tenantId) throws Exception {
URI put = putFile(tenantId, "/" + prefix + "/storage/put.yml");
assertThat(storageInterface.lastModifiedTime(tenantId, new URI("/" + prefix + "/storage/put.yml")), notNullValue());
putFile(tenantId, "/" + prefix + "/storage/put.yml");
assertThat(storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/put.yml")).getLastModifiedTime(), notNullValue());
}
@Test
@@ -466,7 +466,7 @@ public abstract class StorageTestSuite {
path.forEach(throwConsumer(s -> putFile(tenantId, s)));
assertThrows(IllegalArgumentException.class, () -> {
storageInterface.lastModifiedTime(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml"));
storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml")).getLastModifiedTime();
});
}
@@ -475,7 +475,7 @@ public abstract class StorageTestSuite {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();
assertThrows(FileNotFoundException.class, () -> {
storageInterface.lastModifiedTime(tenantId, new URI("/" + prefix + "/storage/"));
storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/")).getLastModifiedTime();
});
}
@@ -490,15 +490,15 @@ public abstract class StorageTestSuite {
putFile(null, nullTenant);
URI with = new URI(withTenant);
assertThat(storageInterface.lastModifiedTime(tenantId, with), notNullValue());
assertThat(storageInterface.getAttributes(tenantId, with).getLastModifiedTime(), notNullValue());
assertThrows(FileNotFoundException.class, () -> {
storageInterface.lastModifiedTime(null, with);
storageInterface.getAttributes(null, with).getLastModifiedTime();
});
URI without = new URI(nullTenant);
assertThat(storageInterface.lastModifiedTime(null, without), notNullValue());
assertThat(storageInterface.getAttributes(null, without).getLastModifiedTime(), notNullValue());
assertThrows(FileNotFoundException.class, () -> {
storageInterface.lastModifiedTime(tenantId, without);
storageInterface.getAttributes(tenantId, without).getLastModifiedTime();
});
}
@@ -509,7 +509,7 @@ public abstract class StorageTestSuite {
String tenantId = IdUtils.create();
putFile(tenantId, "/" + prefix + "/storage/get.yml");
assertThat(storageInterface.lastModifiedTime(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")), notNullValue());
assertThat(storageInterface.getAttributes(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")).getLastModifiedTime(), notNullValue());
}
//endregion

View File

@@ -3,41 +3,20 @@ package io.kestra.jdbc.repository;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.Setting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorState;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.data.model.Pageable;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.*;
import org.jooq.Field;
import org.jooq.Record1;
import org.jooq.Select;
import org.jooq.SelectJoinStep;
import org.jooq.impl.DSL;
import java.time.Duration;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Singleton
public abstract class AbstractJdbcSettingRepository extends AbstractJdbcRepository implements SettingRepositoryInterface {

View File

@@ -15,6 +15,7 @@ import java.util.Optional;
public abstract class AbstractJdbcSubflowExecutionStorage extends AbstractJdbcRepository {
protected io.kestra.jdbc.AbstractJdbcRepository<SubflowExecution<?>> jdbcRepository;
@SuppressWarnings({"unchecked", "rawtypes"})
public AbstractJdbcSubflowExecutionStorage(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}

View File

@@ -10,7 +10,6 @@ import java.io.*;
import java.net.URI;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -96,28 +95,6 @@ public class LocalStorage implements StorageInterface {
}
}
@Override
public Long size(String tenantId, URI uri) throws IOException {
try {
return Files.size(getPath(tenantId, uri));
} catch (NoSuchFileException e) {
throw new FileNotFoundException("Unable to find file at '" + uri + "'");
} catch (IOException e) {
throw new IOException("Unable to find file at '" + uri + "' with message '" + e.getMessage() + "'");
}
}
@Override
public Long lastModifiedTime(String tenantId, URI uri) throws IOException {
FileTime lastModifiedTime;
try {
lastModifiedTime = Files.getLastModifiedTime(getPath(tenantId, uri));
} catch (NoSuchFileException e) {
throw new FileNotFoundException(e.getMessage());
}
return lastModifiedTime.toMillis();
}
@Override
public URI put(String tenantId, URI uri, InputStream data) throws IOException {
File file = getPath(tenantId, uri).toFile();

View File

@@ -601,7 +601,7 @@ public class ExecutionController {
}
return HttpResponse.ok(FileMetas.builder()
.size(storageInterface.size(tenantService.resolveTenant(), path))
.size(storageInterface.getAttributes(tenantService.resolveTenant(), path).getSize())
.build()
);
}

View File

@@ -71,7 +71,7 @@ public class MiscController {
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Misc"}, summary = "Get current configurations")
public Configuration configuration() throws JsonProcessingException {
Configuration.ConfigurationBuilder builder = Configuration
Configuration.ConfigurationBuilder<?, ?> builder = Configuration
.builder()
.uuid(instanceService.fetch())
.version(versionProvider.getVersion())

View File

@@ -65,6 +65,7 @@ public class AuthenticationFilter implements HttpServerFilter {
return Flowable.fromPublisher(chain.proceed(request));
}
@SuppressWarnings("rawtypes")
private boolean isManagementEndpoint(HttpRequest<?> request) {
Optional<RouteMatch> routeMatch = RouteMatchUtils.findRouteMatch(request);
if (routeMatch.isPresent() && routeMatch.get() instanceof MethodBasedRouteMatch<?,?> method) {

View File

@@ -34,6 +34,7 @@ public class MarketplaceFilter implements HttpServerFilter {
return ServerFilterPhase.RENDERING.order();
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
MutableHttpRequest<?> httpRequest = request.mutate();
@@ -46,9 +47,10 @@ public class MarketplaceFilter implements HttpServerFilter {
headers.remove("Accept-Encoding");
});
Map<String, Object> matchValues = request.getAttribute(HttpAttributes.ROUTE_MATCH, RouteMatch.class)
Map<String, Object> matchValues = (Map<String, Object>) request.getAttribute(HttpAttributes.ROUTE_MATCH, RouteMatch.class)
.map(RouteMatch::getVariableValues)
.orElse(Collections.emptyMap());
MarketplaceRequestType type = Optional.ofNullable(matchValues.get("type"))
.map(String.class::cast)
.map(MarketplaceRequestType::fromString)