1
0
mirror of synced 2025-12-22 03:21:25 -05:00
Files
airbyte/airbyte-integrations/connectors/source-greenhouse/components.py

37 lines
1.8 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, Mapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
from airbyte_cdk.sources.declarative.models.declarative_component_schema import DeclarativeStream as DeclarativeStreamModel
from airbyte_cdk.sources.declarative.types import Config
class GreenhouseStateMigration(LegacyToPerPartitionStateMigration):
declarative_stream: DeclarativeStreamModel
config: Config
def __init__(self, declarative_stream: DeclarativeStreamModel, config: Config):
self._partition_router = declarative_stream.retriever.partition_router
self._cursor = declarative_stream.incremental_sync
self._config = config
self._parameters = declarative_stream.parameters
self._partition_key_field = InterpolatedString.create(
self._get_partition_field(self._partition_router), parameters=self._parameters
).eval(self._config)
self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config)
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
"""
LegacyToPerPartitionStateMigration migrates partition keys as string, while real type of id in greenhouse is integer,
which leads to partition mismatch.
To prevent this type casting for partition key was added.
"""
states = [
{"partition": {self._partition_key_field: int(key), "parent_slice": {}}, "cursor": value} for key, value in stream_state.items()
]
return {"states": states}