diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java index ac74111ca3f..a0211cd3677 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java @@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory; *

* - setting all required Airbyte metadata information. */ +@SuppressWarnings("PMD.UnusedPrivateField") public class BootloaderApp { private static final Logger LOGGER = LoggerFactory.getLogger(BootloaderApp.class); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/io/Archives.java b/airbyte-commons/src/main/java/io/airbyte/commons/io/Archives.java index 5cac32cb5f6..0e051214efb 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/io/Archives.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/io/Archives.java @@ -18,13 +18,9 @@ import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class Archives { - private static final Logger LOGGER = LoggerFactory.getLogger(Archives.class); - /** * Compress a @param sourceFolder into a Gzip Tarball @param archiveFile */ diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/JsonSchemas.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/JsonSchemas.java index d951fb40107..30b1fbb8669 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/json/JsonSchemas.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/JsonSchemas.java @@ -43,15 +43,10 @@ public class JsonSchemas { private static final String ARRAY_TYPE = "array"; private static final String OBJECT_TYPE = "object"; private static final String STRING_TYPE = "string"; - private static final String NUMBER_TYPE = "number"; - private static final String BOOLEAN_TYPE = "boolean"; - private static final String NULL_TYPE = "null"; private static final String ONE_OF_TYPE = "oneOf"; private static final String ALL_OF_TYPE = "allOf"; private static final String ANY_OF_TYPE = "anyOf"; - private static final String ARRAY_JSON_PATH = "[]"; - private static final Set COMPOSITE_KEYWORDS = Set.of(ONE_OF_TYPE, ALL_OF_TYPE, ANY_OF_TYPE); /** diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java index c1f921eb829..692d4ea050f 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java @@ -15,13 +15,9 @@ import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DataTypeUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(DataTypeUtils.class); - public static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd'T'HH:mm:ss'Z'"; public static final String DATE_FORMAT_WITH_MILLISECONDS_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java index 1064744e0c1..412c7d974ee 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java @@ -4,9 +4,7 @@ package io.airbyte.db.instance.configs.migrations; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConfigSchema; import io.airbyte.config.EnvConfigs; @@ -38,7 +36,6 @@ import org.slf4j.LoggerFactory; */ public class V0_30_22_001__Store_last_sync_state extends BaseJavaMigration { - private static final ObjectMapper MAPPER = MoreMappers.initMapper(); private static final String MIGRATION_NAME = "Configs db migration 0.30.22.001"; private static final Logger LOGGER = LoggerFactory.getLogger(V0_30_22_001__Store_last_sync_state.class); @@ -139,7 +136,6 @@ public class V0_30_22_001__Store_last_sync_state extends BaseJavaMigration { final Table attemptsTable = DSL.table("attempts"); final Field attemptJobId = DSL.field("attempts.job_id", SQLDataType.BIGINT); - final Field attemptNumber = DSL.field("attempts.attempt_number", SQLDataType.INTEGER); final Field attemptCreatedAt = DSL.field("attempts.created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); // output schema: JobOutput.yaml diff --git a/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java b/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java index 78960b19f7b..431c7e95dfd 100644 --- a/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java +++ b/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java @@ -19,10 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs; import io.airbyte.config.JobOutput; @@ -31,9 +28,7 @@ import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncState; import io.airbyte.config.State; import io.airbyte.db.Database; -import io.airbyte.db.factory.DatabaseCheckFactory; import io.airbyte.db.init.DatabaseInitializationException; -import io.airbyte.db.instance.DatabaseConstants; import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest; import io.airbyte.db.instance.jobs.JobsDatabaseTestProvider; import java.io.IOException; @@ -62,7 +57,6 @@ import org.junit.jupiter.api.Timeout; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) class V0_30_22_001__Store_last_sync_state_test extends AbstractConfigsDatabaseTest { - private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper(); private static final OffsetDateTime TIMESTAMP = OffsetDateTime.now(); private static final Table JOBS_TABLE = table("jobs"); @@ -290,9 +284,4 @@ class V0_30_22_001__Store_last_sync_state_test extends AbstractConfigsDatabaseTe } } - private void initializeJobsDatabase(final DSLContext dslContext) throws DatabaseInitializationException, IOException { - final String initialSchema = MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH); - DatabaseCheckFactory.createJobsDatabaseInitializer(dslContext, DatabaseConstants.DEFAULT_CONNECTION_TIMEOUT_MS, initialSchema).initialize(); - } - } diff --git a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricsQueriesTest.java b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricsQueriesTest.java index f5793e41bdc..c5259616864 100644 --- a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricsQueriesTest.java +++ b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricsQueriesTest.java @@ -206,7 +206,6 @@ public class MetricsQueriesTest { configDb.transaction( ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(5L, inactiveConnectionId.toString(), JobStatus.running).execute()); - final var res = configDb.query(MetricQueries::numberOfRunningJobs); assertEquals(2, configDb.query(MetricQueries::numberOfRunningJobs)); assertEquals(1, configDb.query(MetricQueries::numberOfOrphanRunningJobs)); } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java index cf0d37ff7c8..b9484c77fd9 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java @@ -49,7 +49,7 @@ public class OAuthImplementationFactory { .put("airbyte/source-square", new SquareOAuthFlow(configRepository, httpClient)) .put("airbyte/source-strava", new StravaOAuthFlow(configRepository, httpClient)) .put("airbyte/source-surveymonkey", new SurveymonkeyOAuthFlow(configRepository, httpClient)) - .put("airbyte/source-trello", new TrelloOAuthFlow(configRepository, httpClient)) + .put("airbyte/source-trello", new TrelloOAuthFlow(configRepository)) .put("airbyte/source-youtube-analytics", new YouTubeAnalyticsOAuthFlow(configRepository, httpClient)) .put("airbyte/source-drift", new DriftOAuthFlow(configRepository, httpClient)) .put("airbyte/source-zendesk-chat", new ZendeskChatOAuthFlow(configRepository, httpClient)) diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/TrelloOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/TrelloOAuthFlow.java index 40e891e74c9..0f607e9b023 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/TrelloOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/TrelloOAuthFlow.java @@ -19,7 +19,6 @@ import io.airbyte.oauth.BaseOAuthFlow; import io.airbyte.protocol.models.OAuthConfigSpecification; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; -import java.net.http.HttpClient; import java.util.List; import java.util.Map; import java.util.UUID; @@ -42,7 +41,7 @@ public class TrelloOAuthFlow extends BaseOAuthFlow { private static final OAuthHmacSigner signer = new OAuthHmacSigner(); private final HttpTransport transport; - public TrelloOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient) { + public TrelloOAuthFlow(final ConfigRepository configRepository) { super(configRepository); transport = new NetHttpTransport(); } diff --git a/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/TrelloOAuthFlowIntegrationTest.java b/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/TrelloOAuthFlowIntegrationTest.java index 3d2a43ff037..41e7d58d25e 100644 --- a/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/TrelloOAuthFlowIntegrationTest.java +++ b/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/TrelloOAuthFlowIntegrationTest.java @@ -21,7 +21,6 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; -import java.net.http.HttpClient; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -45,7 +44,6 @@ public class TrelloOAuthFlowIntegrationTest { private TrelloOAuthFlow trelloOAuthFlow; private HttpServer server; private ServerHandler serverHandler; - private HttpClient httpClient; @BeforeEach public void setup() throws IOException { @@ -54,8 +52,7 @@ public class TrelloOAuthFlowIntegrationTest { "Must provide path to a oauth credentials file."); } configRepository = mock(ConfigRepository.class); - httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); - trelloOAuthFlow = new TrelloOAuthFlow(configRepository, httpClient); + trelloOAuthFlow = new TrelloOAuthFlow(configRepository); server = HttpServer.create(new InetSocketAddress(8000), 0); server.setExecutor(null); // creates a default executor diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index f1d6aea57d4..fe787abfd61 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -306,15 +306,14 @@ public class CatalogHelpers { final AirbyteStream streamOld = descriptorToStreamOld.get(descriptor); final AirbyteStream streamNew = descriptorToStreamNew.get(descriptor); if (!streamOld.equals(streamNew)) { - streamTransforms.add(StreamTransform.createUpdateStreamTransform(descriptor, getStreamDiff(descriptor, streamOld, streamNew))); + streamTransforms.add(StreamTransform.createUpdateStreamTransform(descriptor, getStreamDiff(streamOld, streamNew))); } }); return streamTransforms; } - private static UpdateStreamTransform getStreamDiff(final StreamDescriptor descriptor, - final AirbyteStream streamOld, + private static UpdateStreamTransform getStreamDiff(final AirbyteStream streamOld, final AirbyteStream streamNew) { final Set fieldTransforms = new HashSet<>(); final Map, JsonNode> fieldNameToTypeOld = getFullyQualifiedFieldNamesWithTypes(streamOld.getJsonSchema()) diff --git a/airbyte-queue/src/test/java/io/airbyte/queue/OnDiskQueueTest.java b/airbyte-queue/src/test/java/io/airbyte/queue/OnDiskQueueTest.java index 4a9f53550a9..ad8317afffa 100644 --- a/airbyte-queue/src/test/java/io/airbyte/queue/OnDiskQueueTest.java +++ b/airbyte-queue/src/test/java/io/airbyte/queue/OnDiskQueueTest.java @@ -19,12 +19,9 @@ import java.util.Objects; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class OnDiskQueueTest { - private static final Logger LOGGER = LoggerFactory.getLogger(OnDiskQueueTest.class); private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); private CloseableQueue queue; private Path queueRoot; diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 742ba343b34..0855bb0ca4a 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -709,7 +709,7 @@ public class DefaultJobPersistence implements JobPersistence { final String JOB_HISTORY_PURGE_SQL = MoreResources.readResource("job_history_purge.sql"); // interval '?' days cannot use a ? bind, so we're using %d instead. final String sql = String.format(JOB_HISTORY_PURGE_SQL, (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1)); - final Integer rows = jobDatabase.query(ctx -> ctx.execute(sql, + jobDatabase.query(ctx -> ctx.execute(sql, asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")), JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS, JOB_HISTORY_MINIMUM_RECENCY)); diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java index c190c3ecc98..8dd3795a962 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java @@ -182,14 +182,4 @@ public class JobNotifier { return NotificationClient.createNotificationClient(notification); } - private static String formatDurationPart(final long durationPart, final String timeUnit) { - if (durationPart == 1) { - return String.format(" %s %s", durationPart, timeUnit); - } else if (durationPart > 1) { - // Use plural timeUnit - return String.format(" %s %ss", durationPart, timeUnit); - } - return ""; - } - } diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index c2a47c52a7b..cbee507bf04 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -852,7 +852,7 @@ class DefaultJobPersistenceTest { final Instant afterNow = NOW.plusSeconds(1000); when(timeSupplier.get()).thenReturn(afterNow); - final long jobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); final Optional actual = jobPersistence.getFirstReplicationJob(CONNECTION_ID); final Job expected = createJob(jobId1, SYNC_JOB_CONFIG, JobStatus.SUCCEEDED, attempts, NOW.getEpochSecond()); @@ -1190,9 +1190,9 @@ class DefaultJobPersistenceTest { final long desiredJobId4 = jobPersistence.enqueueJob(desiredConnectionId.toString(), CHECK_JOB_CONFIG).orElseThrow(); // right connection id and status, wrong config type - final long otherJobId1 = jobPersistence.enqueueJob(desiredConnectionId.toString(), SPEC_JOB_CONFIG).orElseThrow(); + jobPersistence.enqueueJob(desiredConnectionId.toString(), SPEC_JOB_CONFIG).orElseThrow(); // right config type and status, wrong connection id - final long otherJobId2 = jobPersistence.enqueueJob(otherConnectionId.toString(), SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.enqueueJob(otherConnectionId.toString(), SYNC_JOB_CONFIG).orElseThrow(); // right connection id and config type, wrong status final long otherJobId3 = jobPersistence.enqueueJob(desiredConnectionId.toString(), CHECK_JOB_CONFIG).orElseThrow(); jobPersistence.failAttempt(otherJobId3, jobPersistence.createAttempt(otherJobId3, LOG_PATH)); @@ -1256,7 +1256,6 @@ class DefaultJobPersistenceTest { private Job persistJobForJobHistoryTesting(final String scope, final JobConfig jobConfig, final JobStatus status, final LocalDateTime runDate) throws IOException, SQLException { - final String when = runDate.toString(); final Optional id = jobDatabase.query( ctx -> ctx.fetch( "INSERT INTO jobs(config_type, scope, created_at, updated_at, status, config) " + @@ -1286,7 +1285,7 @@ class DefaultJobPersistenceTest { + " \"sync\": {\n" + " \"output_catalog\": {" + "}}}"; - final Integer attemptNumber = jobDatabase.query(ctx -> ctx.fetch( + jobDatabase.query(ctx -> ctx.fetch( "INSERT INTO attempts(job_id, attempt_number, log_path, status, created_at, updated_at, output) " + "VALUES(?, ?, ?, CAST(? AS ATTEMPT_STATUS), ?, ?, CAST(? as JSONB)) RETURNING attempt_number", job.getId(), @@ -1393,7 +1392,7 @@ class DefaultJobPersistenceTest { addStateToJob(decoyJobs.get(lastStatePosition + 1)); // An older job with state should also exist, so we ensure we picked the most-recent with queries. - final Job olderJobWithState = addStateToJob(allJobs.get(lastStatePosition + 1)); + addStateToJob(allJobs.get(lastStatePosition + 1)); // sanity check that the attempt does have saved state so the purge history sql detects it correctly assertTrue(lastJobWithState.getAttempts().get(0).getOutput() != null, diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java index e0cb105e4e9..f9edbf365ab 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java @@ -241,7 +241,7 @@ class JobTrackerTest { when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)) .thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME)); assertCorrectMessageForEachState((jobState) -> jobTracker.trackDiscover(JOB_ID, UUID1, WORKSPACE_ID, jobState), metadata); - assertCorrectMessageForEachState((jobState) -> jobTracker.trackDiscover(JOB_ID, UUID1, null, jobState), metadata, false); + assertCorrectMessageForEachState((jobState) -> jobTracker.trackDiscover(JOB_ID, UUID1, null, jobState), metadata); } @Test @@ -600,10 +600,6 @@ class JobTrackerTest { } } - private void assertCorrectMessageForEachState(final Consumer jobStateConsumer, final Map expectedMetadata) { - assertCorrectMessageForEachState(jobStateConsumer, expectedMetadata, true); - } - /** * Tests that the tracker emits the correct message for when the job starts, succeeds, and fails. * @@ -612,8 +608,7 @@ class JobTrackerTest { * @param expectedMetadata - expected metadata (except job state). */ private void assertCorrectMessageForEachState(final Consumer jobStateConsumer, - final Map expectedMetadata, - final boolean workspaceSet) { + final Map expectedMetadata) { jobStateConsumer.accept(JobState.STARTED); assertCorrectMessageForStartedState(expectedMetadata); jobStateConsumer.accept(JobState.SUCCEEDED); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index ee2ec3d4209..7535a019b2a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -62,7 +62,6 @@ public class ConfigDumpImporter { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpImporter.class); private static final String CONFIG_FOLDER_NAME = "airbyte_config"; - private static final String DB_FOLDER_NAME = "airbyte_db"; private static final String VERSION_FILE_NAME = "VERSION"; private static final Path TMP_AIRBYTE_STAGED_RESOURCES = Path.of("/tmp/airbyte_staged_resources"); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java index e8c58c0a55b..455101f1785 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java @@ -45,13 +45,9 @@ import java.util.Map.Entry; import java.util.UUID; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DestinationDefinitionsHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(DestinationDefinitionsHandler.class); - private final ConfigRepository configRepository; private final Supplier uuidSupplier; private final SynchronousSchedulerClient schedulerSynchronousClient; diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index 807b4c9d3fe..d6a2111eded 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -34,13 +34,9 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DestinationHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(DestinationHandler.class); - private final ConnectionsHandler connectionsHandler; private final Supplier uuidGenerator; private final ConfigRepository configRepository; diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 9d995521fa7..cbdb54bd451 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -69,12 +69,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Optional; import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SchedulerHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class); private static final HashFunction HASH_FUNCTION = Hashing.md5(); private static final ImmutableSet VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET = diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigurationUpdateTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigurationUpdateTest.java index ca40aa4511e..eaf0b7f2133 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigurationUpdateTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigurationUpdateTest.java @@ -33,7 +33,6 @@ class ConfigurationUpdateTest { private static final String IMAGE_REPOSITORY = "foo"; private static final String IMAGE_TAG = "bar"; - private static final String IMAGE_NAME = IMAGE_REPOSITORY + ":" + IMAGE_TAG; private static final UUID UUID1 = UUID.randomUUID(); private static final UUID UUID2 = UUID.randomUUID(); private static final JsonNode SPEC = CatalogHelpers.fieldsToJsonSchema( diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index c375462e28b..af054de1209 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -44,7 +44,6 @@ import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -73,10 +72,7 @@ class ConnectionsHandlerTest { private ConnectionsHandler connectionsHandler; private UUID workspaceId; - private UUID sourceDefinitionId; private UUID sourceId; - private UUID deletedSourceId; - private UUID destinationDefinitionId; private UUID destinationId; private SourceConnection source; @@ -85,7 +81,6 @@ class ConnectionsHandlerTest { private StandardSync standardSyncDeleted; private UUID connectionId; private UUID operationId; - private StandardSyncOperation standardSyncOperation; private WorkspaceHelper workspaceHelper; private TrackingClient trackingClient; private EventRunner eventRunner; @@ -95,9 +90,7 @@ class ConnectionsHandlerTest { void setUp() throws IOException, JsonValidationException, ConfigNotFoundException { workspaceId = UUID.randomUUID(); - sourceDefinitionId = UUID.randomUUID(); sourceId = UUID.randomUUID(); - destinationDefinitionId = UUID.randomUUID(); destinationId = UUID.randomUUID(); connectionId = UUID.randomUUID(); operationId = UUID.randomUUID(); @@ -140,10 +133,6 @@ class ConnectionsHandlerTest { .withSchedule(ConnectionHelpers.generateBasicSchedule()) .withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS); - standardSyncOperation = new StandardSyncOperation() - .withOperationId(operationId) - .withWorkspaceId(workspaceId); - configRepository = mock(ConfigRepository.class); uuidGenerator = mock(Supplier.class); workspaceHelper = mock(WorkspaceHelper.class); @@ -151,7 +140,6 @@ class ConnectionsHandlerTest { eventRunner = mock(EventRunner.class); when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(sourceId)).thenReturn(workspaceId); - when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(deletedSourceId)).thenReturn(workspaceId); when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(destinationId)).thenReturn(workspaceId); when(workspaceHelper.getWorkspaceForOperationIdIgnoreExceptions(operationId)).thenReturn(workspaceId); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java index 46d5290e740..ae54963864a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java @@ -411,7 +411,6 @@ class DestinationDefinitionsHandlerTest { .getDestinationDefinition( new DestinationDefinitionIdRequestBody().destinationDefinitionId(destinationDefinition.getDestinationDefinitionId())); final String currentTag = currentDestination.getDockerImageTag(); - final String dockerRepository = currentDestination.getDockerRepository(); final String newDockerImageTag = "averydifferenttag"; assertNotEquals(newDockerImageTag, currentTag); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java index d161f4e7a9c..41f9c4561bb 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java @@ -15,7 +15,6 @@ import com.google.common.collect.Lists; import io.airbyte.api.model.generated.DestinationCloneConfiguration; import io.airbyte.api.model.generated.DestinationCloneRequestBody; import io.airbyte.api.model.generated.DestinationCreate; -import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.model.generated.DestinationDefinitionSpecificationRead; import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationRead; @@ -23,7 +22,6 @@ import io.airbyte.api.model.generated.DestinationReadList; import io.airbyte.api.model.generated.DestinationSearch; import io.airbyte.api.model.generated.DestinationUpdate; import io.airbyte.api.model.generated.WorkspaceIdRequestBody; -import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; import io.airbyte.config.DestinationConnection; import io.airbyte.config.StandardDestinationDefinition; @@ -59,7 +57,6 @@ class DestinationHandlerTest { private Supplier uuidGenerator; private JsonSecretsProcessor secretsProcessor; private ConnectorSpecification connectorSpecification; - private String imageName; @SuppressWarnings("unchecked") @BeforeEach @@ -83,12 +80,6 @@ class DestinationHandlerTest { .withDocumentationUrl("https://wikipedia.org") .withSpec(connectorSpecification); - imageName = - DockerUtils.getTaggedImageName(standardDestinationDefinition.getDockerRepository(), standardDestinationDefinition.getDockerImageTag()); - - final DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody = new DestinationDefinitionIdRequestBody().destinationDefinitionId( - standardDestinationDefinition.getDestinationDefinitionId()); - destinationDefinitionSpecificationRead = new DestinationDefinitionSpecificationRead() .connectionSpecification(connectorSpecification.getConnectionSpecification()) .destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId()) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 1f214fdacee..b9c831caad6 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -52,7 +52,6 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.SecretsRepositoryWriter; -import io.airbyte.config.persistence.StatePersistence; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConnectorSpecification; @@ -95,8 +94,6 @@ class SchedulerHandlerTest { private static final String DESTINATION_DOCKER_TAG = "tag"; private static final String DESTINATION_DOCKER_IMAGE = DockerUtils.getTaggedImageName(DESTINATION_DOCKER_REPO, DESTINATION_DOCKER_TAG); - private static final String OPERATION_NAME = "transfo"; - private static final SourceConnection SOURCE = new SourceConnection() .withName("my postgres db") .withWorkspaceId(UUID.randomUUID()) @@ -131,7 +128,6 @@ class SchedulerHandlerTest { private JobPersistence jobPersistence; private EventRunner eventRunner; private JobConverter jobConverter; - private StatePersistence statePersistence; @BeforeEach void setup() { @@ -147,7 +143,6 @@ class SchedulerHandlerTest { configRepository = mock(ConfigRepository.class); secretsRepositoryWriter = mock(SecretsRepositoryWriter.class); jobPersistence = mock(JobPersistence.class); - statePersistence = mock(StatePersistence.class); eventRunner = mock(EventRunner.class); jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY)); @@ -244,7 +239,6 @@ class SchedulerHandlerTest { final SourceDefinitionIdWithWorkspaceId sourceDefinitionIdWithWorkspaceId = new SourceDefinitionIdWithWorkspaceId().sourceDefinitionId(UUID.randomUUID()).workspaceId(UUID.randomUUID()); - final SynchronousResponse specResponse = (SynchronousResponse) jobResponse; final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() .withName("name") .withDockerRepository(SOURCE_DOCKER_REPO) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java index 2e52859be61..848ec26bf31 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java @@ -405,7 +405,6 @@ class SourceDefinitionsHandlerTest { final String newDockerImageTag = "averydifferenttag"; final SourceDefinitionRead sourceDefinition = sourceDefinitionsHandler .getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(this.sourceDefinition.getSourceDefinitionId())); - final String dockerRepository = sourceDefinition.getDockerRepository(); final String currentTag = sourceDefinition.getDockerImageTag(); assertNotEquals(newDockerImageTag, currentTag); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java index 4b5ac5752c1..664e81bbb10 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java @@ -12,7 +12,6 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; -import io.airbyte.api.model.generated.ConnectionIdRequestBody; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.ConnectionReadList; import io.airbyte.api.model.generated.SourceCloneConfiguration; @@ -26,7 +25,6 @@ import io.airbyte.api.model.generated.SourceReadList; import io.airbyte.api.model.generated.SourceSearch; import io.airbyte.api.model.generated.SourceUpdate; import io.airbyte.api.model.generated.WorkspaceIdRequestBody; -import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSourceDefinition; @@ -65,7 +63,6 @@ class SourceHandlerTest { private Supplier uuidGenerator; private JsonSecretsProcessor secretsProcessor; private ConnectorSpecification connectorSpecification; - private String imageName; @SuppressWarnings("unchecked") @BeforeEach @@ -89,8 +86,6 @@ class SourceHandlerTest { .withDocumentationUrl("https://wikipedia.org") .withSpec(connectorSpecification); - imageName = DockerUtils.getTaggedImageName(standardSourceDefinition.getDockerRepository(), standardSourceDefinition.getDockerImageTag()); - sourceDefinitionSpecificationRead = new SourceDefinitionSpecificationRead() .sourceDefinitionId(standardSourceDefinition.getSourceDefinitionId()) .connectionSpecification(connectorSpecification.getConnectionSpecification()) @@ -348,8 +343,6 @@ class SourceHandlerTest { verify(secretsRepositoryWriter).writeSourceConnection(expectedSourceConnection, connectorSpecification); verify(connectionsHandler).listConnectionsForWorkspace(workspaceIdRequestBody); - final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody() - .connectionId(connectionRead.getConnectionId()); verify(connectionsHandler).deleteConnection(connectionRead.getConnectionId()); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index 0e5aa62b3e1..9122b5debac 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -228,10 +228,6 @@ public class ConnectionHelpers { .selected(true); } - private static AirbyteStream generateBasicApiStream() { - return generateBasicApiStream(null); - } - private static AirbyteStream generateBasicApiStream(final String nameSuffix) { return new AirbyteStream() .name(nameSuffix == null ? STREAM_NAME : STREAM_NAME_BASE + nameSuffix) diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java b/airbyte-test-utils/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java index 084be98bdc7..cefe6280e54 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java @@ -4,7 +4,6 @@ package io.airbyte.test.airbyte_test_container; -import com.google.api.client.util.Preconditions; import com.google.common.collect.Maps; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.generated.HealthApi; @@ -12,7 +11,6 @@ import io.airbyte.api.client.invoker.generated.ApiClient; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.commons.concurrency.WaitingUtils; import java.io.File; -import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.lang.reflect.Field; @@ -83,15 +81,6 @@ public class AirbyteTestContainer { dockerComposeContainer.start(); } - private static Map prepareDockerComposeEnvVariables(final File envFile) throws IOException { - LOGGER.info("Searching for environment in {}", envFile); - Preconditions.checkArgument(envFile.exists(), "could not find docker compose environment"); - - final Properties prop = new Properties(); - prop.load(new FileInputStream(envFile)); - return Maps.fromProperties(prop); - } - /** * TestContainers docker compose files cannot have container_names, so we filter them. */ diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java index 1f7fa4586a3..85d91c6aaf7 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java @@ -42,7 +42,6 @@ import io.airbyte.api.client.model.generated.SyncMode; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; -import io.fabric8.kubernetes.client.KubernetesClient; import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.Charset; @@ -88,7 +87,6 @@ public class AdvancedAcceptanceTests { private static AirbyteAcceptanceTestHarness testHarness; private static AirbyteApiClient apiClient; private static UUID workspaceId; - private static KubernetesClient kubernetesClient; @SuppressWarnings("UnstableApiUsage") @BeforeAll @@ -113,7 +111,6 @@ public class AdvancedAcceptanceTests { LOGGER.info("pg destination definition: {}", destinationDef.getDockerImageTag()); testHarness = new AirbyteAcceptanceTestHarness(apiClient, workspaceId); - kubernetesClient = testHarness.getKubernetesClient(); } @AfterAll diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 313a1c15920..977045e619a 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -965,7 +965,7 @@ public class BasicAcceptanceTests { .status(connection.getStatus()) .resourceRequirements(connection.getResourceRequirements()) .withRefreshedCatalog(true); - final WebBackendConnectionRead connectionUpdateRead = webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnection(update); LOGGER.info("Inspecting Destination DB after the update request, tables should be empty"); destDb.query(ctx -> { @@ -1020,7 +1020,7 @@ public class BasicAcceptanceTests { final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - for (AirbyteStreamAndConfiguration streamAndConfig : catalog.getStreams()) { + for (final AirbyteStreamAndConfiguration streamAndConfig : catalog.getStreams()) { final AirbyteStream stream = streamAndConfig.getStream(); assertEquals(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), stream.getSupportedSyncModes()); // instead of assertFalse to avoid NPE from unboxed. diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/RecordSchemaValidator.java b/airbyte-workers/src/main/java/io/airbyte/workers/RecordSchemaValidator.java index 981742fdbad..41cbe6a1dc0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/RecordSchemaValidator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/RecordSchemaValidator.java @@ -14,8 +14,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Validates that AirbyteRecordMessage data conforms to the JSON schema defined by the source's @@ -25,7 +23,6 @@ import org.slf4j.LoggerFactory; public class RecordSchemaValidator { private final Map streams; - private static final Logger LOGGER = LoggerFactory.getLogger(RecordSchemaValidator.class); public RecordSchemaValidator(final Map streamNamesToSchemas) { // streams is Map of a stream source namespace + name mapped to the stream schema diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/helper/EntrypointEnvChecker.java b/airbyte-workers/src/main/java/io/airbyte/workers/helper/EntrypointEnvChecker.java index 38d4b9ce21f..5c1315d2294 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/helper/EntrypointEnvChecker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/helper/EntrypointEnvChecker.java @@ -51,7 +51,7 @@ public class EntrypointEnvChecker { String outputLine = null; - String line = null; + String line; while (((line = stdout.readLine()) != null) && outputLine == null) { if (line.contains("AIRBYTE_ENTRYPOINT")) { outputLine = line; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java index d8568118728..f7eea5a9465 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java @@ -97,11 +97,4 @@ public class TemporalWorkerRunFactory { return new OutputAndStatus<>(status, new JobOutput().withSync(response.getOutput().orElse(null))); } - private OutputAndStatus toOutputAndStatusConnector() { - // Since we are async we technically can't fail - final JobStatus status = JobStatus.SUCCEEDED; - - return new OutputAndStatus<>(status, new JobOutput().withSync(null)); - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 48c07981a82..2a68780ed57 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -34,13 +34,12 @@ import org.slf4j.MDC; * outputs are passed to the selected worker. It also makes sures that the outputs of the worker are * persisted to the db. */ +@SuppressWarnings("PMD.UnusedFormalParameter") public class TemporalAttemptExecution implements Supplier { private static final Logger LOGGER = LoggerFactory.getLogger(TemporalAttemptExecution.class); private final JobRunConfig jobRunConfig; - private final WorkerEnvironment workerEnvironment; - private final LogConfigs logConfigs; private final Path jobRoot; private final CheckedSupplier, Exception> workerSupplier; private final Supplier inputSupplier; @@ -85,8 +84,6 @@ public class TemporalAttemptExecution implements Supplier final Supplier workflowIdProvider, final String airbyteVersion) { this.jobRunConfig = jobRunConfig; - this.workerEnvironment = workerEnvironment; - this.logConfigs = logConfigs; this.jobRoot = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig.getJobId(), jobRunConfig.getAttemptId()); this.workerSupplier = workerSupplier; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 8d98cc2f316..6332eb639fb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -450,10 +450,6 @@ public class TemporalClient { return client.newWorkflowStub(workflowClass, TemporalUtils.getWorkflowOptions(jobType)); } - private T getWorkflowOptionsWithWorkflowId(final Class workflowClass, final TemporalJobType jobType, final String name) { - return client.newWorkflowStub(workflowClass, TemporalUtils.getWorkflowOptionsWithWorkflowId(jobType, name)); - } - @VisibleForTesting TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier executor) { final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig); diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java index 12ec5651100..21f9957408d 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java @@ -82,9 +82,6 @@ public class KubePodProcessIntegrationTest { private static KubernetesClient fabricClient; private static KubeProcessFactory processFactory; private static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = new WorkerConfigs(new EnvConfigs()).getResourceRequirements(); - private static final String ENV_KEY = "ENV_VAR_1"; - private static final String ENV_VALUE = "ENV_VALUE_1"; - private static final Map ENV_MAP = ImmutableMap.of(ENV_KEY, ENV_VALUE); private WorkerHeartbeatServer server; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index fceedd21453..c483575e580 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -16,7 +16,6 @@ import static io.airbyte.workers.process.AirbyteIntegrationLauncher.WRITE_STEP; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.commons.features.FeatureFlags; import io.airbyte.config.EnvConfigs; import io.airbyte.config.WorkerEnvConstants; import io.airbyte.workers.WorkerConfigs; @@ -57,8 +56,6 @@ class AirbyteIntegrationLauncherTest { @Mock private ProcessFactory processFactory; private AirbyteIntegrationLauncher launcher; - @Mock - private FeatureFlags featureFlags; @BeforeEach void setUp() { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/KubePodProcessTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/KubePodProcessTest.java index 971eb642309..f49e6b5acc5 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/process/KubePodProcessTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/KubePodProcessTest.java @@ -28,8 +28,6 @@ public class KubePodProcessTest { private static final KubernetesClient K8s = new DefaultKubernetesClient(); - private static final String ENTRYPOINT = "sh"; - private static final String TEST_IMAGE_WITH_VAR_PATH = "Dockerfile.with_var"; private static final String TEST_IMAGE_WITH_VAR_NAME = "worker-test:with-var"; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/HeartbeatWorkflow.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/HeartbeatWorkflow.java index cae0737ae35..3fc2c9feaf7 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/HeartbeatWorkflow.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/HeartbeatWorkflow.java @@ -12,8 +12,6 @@ import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import java.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @WorkflowInterface public interface HeartbeatWorkflow { @@ -48,8 +46,6 @@ public interface HeartbeatWorkflow { class HeartbeatActivityImpl implements HeartbeatActivity { - private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatActivityImpl.class); - private final Runnable runnable; public HeartbeatActivityImpl(Runnable runnable) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java index a5d18de45f5..2258e03ecc9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java @@ -42,7 +42,6 @@ import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowOptions; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.worker.Worker; -import java.util.UUID; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -138,7 +137,7 @@ class SyncWorkflowTest { final StandardSyncOutput actualOutput = execute(); - verifyReplication(replicationActivity, syncInput, sync.getConnectionId()); + verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); @@ -155,7 +154,7 @@ class SyncWorkflowTest { assertThrows(WorkflowFailedException.class, this::execute); - verifyReplication(replicationActivity, syncInput, sync.getConnectionId()); + verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); verifyNoInteractions(dbtTransformationActivity); @@ -176,7 +175,7 @@ class SyncWorkflowTest { assertThrows(WorkflowFailedException.class, this::execute); - verifyReplication(replicationActivity, syncInput, sync.getConnectionId()); + verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); verifyNoInteractions(dbtTransformationActivity); @@ -195,7 +194,7 @@ class SyncWorkflowTest { assertThrows(WorkflowFailedException.class, this::execute); - verifyReplication(replicationActivity, syncInput, sync.getConnectionId()); + verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); verifyNoInteractions(dbtTransformationActivity); @@ -219,7 +218,7 @@ class SyncWorkflowTest { assertThrows(WorkflowFailedException.class, this::execute); - verifyReplication(replicationActivity, syncInput, sync.getConnectionId()); + verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); verifyNoInteractions(dbtTransformationActivity); @@ -242,7 +241,7 @@ class SyncWorkflowTest { testEnv.getWorkflowService().blockingStub().requestCancelWorkflowExecution(cancelRequest); } - private static void verifyReplication(final ReplicationActivity replicationActivity, final StandardSyncInput syncInput, final UUID connectionId) { + private static void verifyReplication(final ReplicationActivity replicationActivity, final StandardSyncInput syncInput) { verify(replicationActivity).replicate( JOB_RUN_CONFIG, SOURCE_LAUNCHER_CONFIG, diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/testsyncworkflow/SyncWorkflowFailingOutputWorkflow.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/testsyncworkflow/SyncWorkflowFailingOutputWorkflow.java index d100d742b37..2a83acef0a3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/testsyncworkflow/SyncWorkflowFailingOutputWorkflow.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/testsyncworkflow/SyncWorkflowFailingOutputWorkflow.java @@ -6,12 +6,9 @@ package io.airbyte.workers.temporal.scheduling.testsyncworkflow; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.StandardSyncSummary; -import io.airbyte.config.StandardSyncSummary.ReplicationStatus; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.temporal.sync.SyncWorkflow; -import java.util.ArrayList; import java.util.UUID; public class SyncWorkflowFailingOutputWorkflow implements SyncWorkflow { @@ -26,14 +23,6 @@ public class SyncWorkflowFailingOutputWorkflow implements SyncWorkflow { final IntegrationLauncherConfig destinationLauncherConfig, final StandardSyncInput syncInput, final UUID connectionId) { - final StandardSyncSummary standardSyncSummary = new StandardSyncSummary() - .withStatus(ReplicationStatus.FAILED) - .withRecordsSynced(0L); - - final StandardSyncOutput standardSyncOutput = new StandardSyncOutput() - .withStandardSyncSummary(standardSyncSummary) - .withFailures(new ArrayList<>()); - return null; } diff --git a/tools/gradle/pmd/rules.xml b/tools/gradle/pmd/rules.xml index 8099de69309..c1561d3f8a5 100644 --- a/tools/gradle/pmd/rules.xml +++ b/tools/gradle/pmd/rules.xml @@ -34,11 +34,6 @@ - - - - -