diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index b190076e00a..e7ed4da1910 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -599,6 +599,29 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/sources/most_recent_source_actor_catalog: + post: + tags: + - source + summary: Get most recent ActorCatalog for source + operationId: getMostRecentSourceActorCatalog + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SourceIdRequestBody" + required: true + responses: + "200": + description: Successful operation + content: + application/json: + schema: + $ref: "#/components/schemas/ActorCatalogWithUpdatedAt" + "404": + $ref: "#/components/responses/NotFoundResponse" + "422": + $ref: "#/components/responses/InvalidInputResponse" /v1/sources/search: post: tags: @@ -3692,6 +3715,16 @@ components: properties: logType: $ref: "#/components/schemas/LogType" + # ACTOR CATALOG + ActorCatalogWithUpdatedAt: + description: A source actor catalog with the timestamp it was mostly recently updated + type: object + properties: + updatedAt: + type: integer + format: int64 + catalog: + type: object # SCHEMA CATALOG AirbyteCatalog: description: describes the available schema (catalog). diff --git a/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml b/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml new file mode 100644 index 00000000000..812b7c14eef --- /dev/null +++ b/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml @@ -0,0 +1,23 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +title: ActorCatalogWithUpdatedAt +description: Catalog of an actor with its most recent ActorCatalogFetchEvent created_at timestamp. +type: object +additionalProperties: false +required: + - id + - catalog + - catalogHash + - updatedAt +properties: + id: + type: string + format: uuid + catalog: + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + catalogHash: + type: string + updatedAt: + type: integer + format: int64 diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index cc896bfdb0b..78c111a6bbf 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -32,6 +32,7 @@ import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.Version; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; +import io.airbyte.config.ActorCatalogWithUpdatedAt; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -1314,6 +1315,16 @@ public class ConfigRepository { return records.stream().findFirst().map(DbConverter::buildActorCatalog); } + public Optional getMostRecentSourceActorCatalog(final UUID sourceId) throws IOException { + final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk(), ACTOR_CATALOG_FETCH_EVENT.CREATED_AT) + .from(ACTOR_CATALOG) + .join(ACTOR_CATALOG_FETCH_EVENT) + .on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID)) + .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) + .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); + return records.stream().findFirst().map(DbConverter::buildActorCatalogWithUpdatedAt); + } + public Optional getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException { final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk()) .from(ACTOR_CATALOG) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index e4b3fcb1003..41f1822bbd4 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -17,6 +17,7 @@ import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; +import io.airbyte.config.ActorCatalogWithUpdatedAt; import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -218,6 +219,14 @@ public class DbConverter { .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)); } + public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Record record) { + return new ActorCatalogWithUpdatedAt() + .withId(record.get(ACTOR_CATALOG.ID)) + .withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString())) + .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)) + .withUpdatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC)); + } + public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) { return new ActorCatalogFetchEvent() .withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID)) diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java index 595087e0d19..d0bd6152325 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java @@ -5,6 +5,7 @@ package io.airbyte.server.apis; import io.airbyte.api.generated.SourceApi; +import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.SourceCloneRequestBody; import io.airbyte.api.model.generated.SourceCreate; @@ -66,6 +67,11 @@ public class SourceApiController implements SourceApi { return ApiHelper.execute(() -> sourceHandler.getSource(sourceIdRequestBody)); } + @Override + public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) { + return ApiHelper.execute(() -> sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(sourceIdRequestBody)); + } + @Override public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) { return ApiHelper.execute(() -> sourceHandler.listSourcesForWorkspace(workspaceIdRequestBody)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index 6a24b5169c6..fcc9e44e609 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -6,6 +6,7 @@ package io.airbyte.server.handlers; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; +import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.SourceCloneConfiguration; import io.airbyte.api.model.generated.SourceCloneRequestBody; @@ -31,6 +32,7 @@ import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; @@ -132,6 +134,17 @@ public class SourceHandler { return buildSourceRead(sourceIdRequestBody.getSourceId()); } + public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalogWithUpdatedAt(final SourceIdRequestBody sourceIdRequestBody) + throws IOException { + Optional actorCatalog = + configRepository.getMostRecentSourceActorCatalog(sourceIdRequestBody.getSourceId()); + if (actorCatalog.isEmpty()) { + return new ActorCatalogWithUpdatedAt(); + } else { + return new ActorCatalogWithUpdatedAt().updatedAt(actorCatalog.get().getUpdatedAt()).catalog(actorCatalog.get().getCatalog()); + } + } + public SourceRead cloneSource(final SourceCloneRequestBody sourceCloneRequestBody) throws JsonValidationException, IOException, ConfigNotFoundException { // read source configuration from db diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index c81bfb97597..0103d8f0f5f 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -123,7 +123,6 @@ class WebBackendConnectionsHandlerTest { private SchedulerHandler schedulerHandler; private StateHandler stateHandler; private WebBackendConnectionsHandler wbHandler; - private SourceRead sourceRead; private ConnectionRead connectionRead; private ConnectionRead brokenConnectionRead; @@ -1090,6 +1089,7 @@ class WebBackendConnectionsHandlerTest { new ConnectionRead().connectionId(expected.getConnectionId()).breakingChange(true).sourceId(sourceId)); final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of()); + when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize( "{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}")))); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java index c30631dcf69..d3ecbb1709b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java @@ -8,14 +8,13 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_ import datadog.trace.api.Trace; import io.airbyte.api.client.generated.SourceApi; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; +import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.config.ActorCatalogFetchEvent; -import io.airbyte.config.persistence.ConfigRepository; import jakarta.inject.Singleton; -import java.io.IOException; import java.time.OffsetDateTime; -import java.util.Optional; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -23,15 +22,11 @@ import lombok.extern.slf4j.Slf4j; @Singleton public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { - private final Optional configRepository; - private final SourceApi sourceApi; private final EnvVariableFeatureFlags envVariableFeatureFlags; - public RefreshSchemaActivityImpl(Optional configRepository, - SourceApi sourceApi, + public RefreshSchemaActivityImpl(SourceApi sourceApi, EnvVariableFeatureFlags envVariableFeatureFlags) { - this.configRepository = configRepository; this.sourceApi = sourceApi; this.envVariableFeatureFlags = envVariableFeatureFlags; } @@ -39,8 +34,7 @@ public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { @Override @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) public boolean shouldRefreshSchema(UUID sourceCatalogId) { - // if job persistence is unavailable, default to skipping the schema refresh - if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) { + if (!envVariableFeatureFlags.autoDetectSchema()) { return false; } @@ -66,12 +60,13 @@ public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { private boolean schemaRefreshRanRecently(UUID sourceCatalogId) { try { - Optional mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId); - if (mostRecentFetchEvent.isEmpty()) { + SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId); + ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody); + if (mostRecentFetchEvent.getUpdatedAt() == null) { return false; } - return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond(); - } catch (IOException e) { + return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond(); + } catch (ApiException e) { // catching this exception because we don't want to block replication due to a failed schema refresh log.info("Encountered an error fetching most recent actor catalog fetch event: ", e); return true; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java index d120bf8c7f4..e2168db11f1 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.scheduling.activities; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -11,14 +12,11 @@ import static org.mockito.Mockito.when; import io.airbyte.api.client.generated.SourceApi; import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.config.ActorCatalogFetchEvent; -import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl; -import java.io.IOException; import java.time.OffsetDateTime; -import java.util.Optional; import java.util.UUID; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -29,7 +27,6 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) class RefreshSchemaActivityTest { - static private ConfigRepository mConfigRepository; static private SourceApi mSourceApi; static private EnvVariableFeatureFlags mEnvVariableFeatureFlags; @@ -40,32 +37,31 @@ class RefreshSchemaActivityTest { @BeforeEach void setUp() { mSourceApi = mock(SourceApi.class); - mConfigRepository = mock(ConfigRepository.class); mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class); mSourceApi = mock(SourceApi.class); when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true); - refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags); + refreshSchemaActivity = new RefreshSchemaActivityImpl(mSourceApi, mEnvVariableFeatureFlags); } @Test - void testShouldRefreshSchemaNoRecentRefresh() throws IOException { - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.empty()); + void testShouldRefreshSchemaNoRecentRefresh() throws ApiException { + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(new ActorCatalogWithUpdatedAt()); Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } @Test - void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException { + void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws ApiException { Long twoDaysAgo = OffsetDateTime.now().minusHours(48l).toEpochSecond(); - ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twoDaysAgo); - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); + ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twoDaysAgo); + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } @Test - void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException { + void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws ApiException { Long twelveHoursAgo = OffsetDateTime.now().minusHours(12l).toEpochSecond(); - ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twelveHoursAgo); - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); + ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twelveHoursAgo); + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index cef598e12e0..f9bb724fb0e 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -319,6 +319,7 @@ font-style: italic;
  • post /v1/sources/create
  • post /v1/sources/delete
  • post /v1/sources/discover_schema
  • +
  • post /v1/sources/most_recent_source_actor_catalog
  • post /v1/sources/get
  • post /v1/sources/list
  • post /v1/sources/search
  • @@ -6498,6 +6499,65 @@ containing the updated stream needs to be sent. InvalidInputExceptionInfo
    +
    +
    + Up +
    post /v1/sources/most_recent_source_actor_catalog
    +
    Get most recent ActorCatalog for source (getMostRecentSourceActorCatalog)
    +
    + + +

    Consumes

    + This API call consumes the following media types via the Content-Type request header: +
      +
    • application/json
    • +
    + +

    Request body

    +
    +
    SourceIdRequestBody SourceIdRequestBody (required)
    + +
    Body Parameter
    + +
    + + + + +

    Return type

    + + + + +

    Example data

    +
    Content-Type: application/json
    +
    {
    +  "catalog" : "{}",
    +  "updatedAt" : 0
    +}
    + +

    Produces

    + This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
      +
    • application/json
    • +
    + +

    Responses

    +

    200

    + Successful operation + ActorCatalogWithUpdatedAt +

    404

    + Object with given id was not found. + NotFoundKnownExceptionInfo +

    422

    + Input failed validation + InvalidInputExceptionInfo +
    +
    Up @@ -10016,6 +10076,7 @@ containing the updated stream needs to be sent.

    Table of Contents

      +
    1. ActorCatalogWithUpdatedAt -
    2. ActorDefinitionResourceRequirements -
    3. AdvancedAuth -
    4. AirbyteCatalog -
    5. @@ -10196,6 +10257,14 @@ containing the updated stream needs to be sent.
  • WorkspaceUpdateName -
  • +
    +

    ActorCatalogWithUpdatedAt - Up

    +
    A source actor catalog with the timestamp it was mostly recently updated
    +
    +
    updatedAt (optional)
    Long format: int64
    +
    catalog (optional)
    +
    +

    ActorDefinitionResourceRequirements - Up

    actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.