1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[airbyte-cdk] - Add XmlDecoder component to low code CDK (#46360)

This commit is contained in:
Patrick Nilan
2024-10-09 11:18:32 -07:00
committed by GitHub
parent 04b6c1bc39
commit 9249347736
15 changed files with 1746 additions and 1320 deletions

View File

@@ -1407,6 +1407,7 @@ definitions:
description: Component used to decode the response.
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/XmlDecoder"
$parameters:
type: object
additionalProperties: true
@@ -1739,6 +1740,16 @@ definitions:
type:
type: string
enum: [IterableDecoder]
XmlDecoder:
title: XML Decoder
description: Use this is the response is XML.
type: object
required:
- type
properties:
type:
type: string
enum: [XmlDecoder]
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
@@ -2382,6 +2393,7 @@ definitions:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
$parameters:
type: object
additionalProperties: true
@@ -2497,6 +2509,7 @@ definitions:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
$parameters:
type: object
additionalProperties: true

View File

@@ -5,6 +5,7 @@
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder"]
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]

View File

@@ -0,0 +1,36 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from dataclasses import dataclass
from typing import Any, Generator, MutableMapping
import requests
from airbyte_cdk.sources.declarative.decoders import Decoder
logger = logging.getLogger("airbyte")
@dataclass
class PaginationDecoderDecorator(Decoder):
"""
Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed.
"""
def __init__(self, decoder: Decoder):
self._decoder = decoder
@property
def decoder(self) -> Decoder:
return self._decoder
def is_stream_response(self) -> bool:
return self._decoder.is_stream_response()
def decode(self, response: requests.Response) -> Generator[MutableMapping[str, Any], None, None]:
if self._decoder.is_stream_response():
logger.warning("Response is streamed and therefore will not be decoded for pagination.")
yield {}
else:
yield from self._decoder.decode(response)

View File

@@ -0,0 +1,93 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping, MutableMapping
from xml.parsers.expat import ExpatError
import requests
import xmltodict
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
logger = logging.getLogger("airbyte")
@dataclass
class XmlDecoder(Decoder):
"""
XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict.
This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations.
Example XML Input:
<root>
<location id="123">
San Francisco
</location>
<item id="1" category="books">
<name>Book Title 1</name>
<price>10.99</price>
</item>
<item id="2" category="electronics">
<name>Gadget</name>
<price>299.99</price>
<description>A useful gadget</description>
</item>
</root>
Converted Output:
{
"root": {
"location: {
"@id": "123,
"#text": "San Francisco"
},
"item": [
{
"@id": "1",
"@category": "books",
"name": "Book Title 1",
"price": "10.99"
},
{
"@id": "2",
"@category": "electronics",
"name": "Gadget",
"price": "299.99",
"description": "A useful gadget"
}
]
}
}
Notes:
- Attributes of an XML element are prefixed with an '@' symbol in the dictionary output.
- Text content of an XML element is handled in two different ways, depending on whether
the element has attributes.
- If the element has attributes, the text content will be
represented by the "#text" key.
- If the element does not have any attributes, the text content will be
represented by element name.
- Namespace declarations are not supported in the current implementation.
"""
parameters: InitVar[Mapping[str, Any]]
def is_stream_response(self) -> bool:
return False
def decode(self, response: requests.Response) -> Generator[MutableMapping[str, Any], None, None]:
body_xml = response.text
try:
body_json = xmltodict.parse(body_xml)
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
yield from body_json
except ExpatError as exc:
logger.warning(f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}")
yield {}

View File

@@ -685,6 +685,10 @@ class IterableDecoder(BaseModel):
type: Literal['IterableDecoder']
class XmlDecoder(BaseModel):
type: Literal['XmlDecoder']
class MinMaxDatetime(BaseModel):
type: Literal['MinMaxDatetime']
datetime: str = Field(
@@ -1458,8 +1462,8 @@ class SessionTokenAuthenticator(BaseModel):
description='Authentication method to use for requests sent to the API, specifying how to inject the session token.',
title='Data Request Authentication',
)
decoder: Optional[JsonDecoder] = Field(
None, description='Component decoding the response', title='Decoder'
decoder: Optional[Union[JsonDecoder, XmlDecoder]] = Field(
None, description='Component used to decode the response.', title='Decoder'
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')
@@ -1625,10 +1629,12 @@ class SimpleRetriever(BaseModel):
description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.',
title='Partition Router',
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder]] = Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = (
Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')
@@ -1689,10 +1695,12 @@ class AsyncRetriever(BaseModel):
description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.',
title='Partition Router',
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder]] = Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = (
Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')

View File

@@ -31,7 +31,14 @@ from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import Decoder, IterableDecoder, JsonDecoder, JsonlDecoder
from airbyte_cdk.sources.declarative.decoders import (
Decoder,
IterableDecoder,
JsonDecoder,
JsonlDecoder,
PaginationDecoderDecorator,
XmlDecoder,
)
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector, ResponseToFileExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator
from airbyte_cdk.sources.declarative.extractors.record_selector import SCHEMA_TRANSFORMER_TYPE_MAPPING
@@ -118,6 +125,7 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitTimeFromHeader as WaitTimeFromHeaderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitUntilTimeFromHeader as WaitUntilTimeFromHeaderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import XmlDecoder as XmlDecoderModel
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
@@ -227,6 +235,7 @@ class ModelToComponentFactory:
JsonlDecoderModel: self.create_jsonl_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
@@ -385,8 +394,6 @@ class ModelToComponentFactory:
self, model: SessionTokenAuthenticatorModel, config: Config, name: str, **kwargs: Any
) -> Union[ApiKeyAuthenticator, BearerAuthenticator]:
decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={})
if not isinstance(decoder, JsonDecoder):
raise ValueError(f"Provided decoder of {type(model.decoder)=} is not supported. Please set JsonDecoder instead.")
login_requester = self._create_component_from_model(
model=model.login_requester, config=config, name=f"{name}_login_requester", decoder=decoder
)
@@ -461,11 +468,20 @@ class ModelToComponentFactory:
def create_cursor_pagination(
self, model: CursorPaginationModel, config: Config, decoder: Decoder, **kwargs: Any
) -> CursorPaginationStrategy:
if not isinstance(decoder, JsonDecoder):
raise ValueError(f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder instead.")
if isinstance(decoder, PaginationDecoderDecorator):
if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(
f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
)
decoder_to_use = decoder
else:
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead.")
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
return CursorPaginationStrategy(
cursor_value=model.cursor_value,
decoder=decoder,
decoder=decoder_to_use,
page_size=model.page_size,
stop_condition=model.stop_condition,
config=config,
@@ -824,9 +840,12 @@ class ModelToComponentFactory:
decoder: Optional[Decoder] = None,
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
decoder_to_use = decoder if decoder else JsonDecoder(parameters={})
if not isinstance(decoder_to_use, JsonDecoder):
raise ValueError(f"Provided decoder of {type(decoder_to_use)=} is not supported. Please set JsonDecoder instead.")
if decoder:
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead.")
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
else:
decoder_to_use = PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
page_size_option = (
self._create_component_from_model(model=model.page_size_option, config=config) if model.page_size_option else None
)
@@ -854,7 +873,10 @@ class ModelToComponentFactory:
def create_dpath_extractor(
self, model: DpathExtractorModel, config: Config, decoder: Optional[Decoder] = None, **kwargs: Any
) -> DpathExtractor:
decoder_to_use = decoder if decoder else JsonDecoder(parameters={})
if decoder:
decoder_to_use = decoder
else:
decoder_to_use = JsonDecoder(parameters={})
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
return DpathExtractor(decoder=decoder_to_use, field_path=model_field_path, config=config, parameters=model.parameters or {})
@@ -945,6 +967,10 @@ class ModelToComponentFactory:
def create_iterable_decoder(model: IterableDecoderModel, config: Config, **kwargs: Any) -> IterableDecoder:
return IterableDecoder(parameters={})
@staticmethod
def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder:
return XmlDecoder(parameters={})
@staticmethod
def create_json_file_schema_loader(model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any) -> JsonFileSchemaLoader:
return JsonFileSchemaLoader(file_path=model.file_path or "", config=config, parameters=model.parameters or {})
@@ -1056,12 +1082,20 @@ class ModelToComponentFactory:
@staticmethod
def create_offset_increment(model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any) -> OffsetIncrement:
if not isinstance(decoder, JsonDecoder):
raise ValueError(f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder instead.")
if isinstance(decoder, PaginationDecoderDecorator):
if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(
f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
)
decoder_to_use = decoder
else:
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead.")
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
return OffsetIncrement(
page_size=model.page_size,
config=config,
decoder=decoder,
decoder=decoder_to_use,
inject_on_first_request=model.inject_on_first_request or False,
parameters=model.parameters or {},
)
@@ -1106,9 +1140,9 @@ class ModelToComponentFactory:
self,
model: RecordSelectorModel,
config: Config,
decoder: Optional[Decoder] = None,
*,
transformations: List[RecordTransformation],
decoder: Optional[Decoder] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> RecordSelector:

View File

@@ -6,8 +6,7 @@ from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, MutableMapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder, PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
@@ -90,7 +89,7 @@ class DefaultPaginator(Paginator):
config: Config
url_base: Union[InterpolatedString, str]
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
decoder: Decoder = field(default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})))
page_size_option: Optional[RequestOption] = None
page_token_option: Optional[Union[RequestPath, RequestOption]] = None

View File

@@ -6,8 +6,7 @@ from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, Mapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder, PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
@@ -32,7 +31,7 @@ class CursorPaginationStrategy(PaginationStrategy):
parameters: InitVar[Mapping[str, Any]]
page_size: Optional[int] = None
stop_condition: Optional[Union[InterpolatedBoolean, str]] = None
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
decoder: Decoder = field(default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})))
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._initial_cursor = None

View File

@@ -6,7 +6,7 @@ from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder, PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.types import Config, Record
@@ -39,7 +39,7 @@ class OffsetIncrement(PaginationStrategy):
config: Config
page_size: Optional[Union[str, int]]
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
decoder: Decoder = field(default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})))
inject_on_first_request: bool = False
def __post_init__(self, parameters: Mapping[str, Any]) -> None:

File diff suppressed because it is too large Load Diff

View File

@@ -69,6 +69,7 @@ cryptography = "^42.0.5"
pytz = "2024.1"
orjson = "^3.10.7"
serpyco-rs = "^1.10.2"
xmltodict = "^0.13.0"
[tool.poetry.group.dev.dependencies]
freezegun = "*"

View File

@@ -0,0 +1,26 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import pytest
import requests
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator
class StreamingJsonDecoder(JsonDecoder):
def is_stream_response(self) -> bool:
return True
@pytest.mark.parametrize(
"decoder_class, expected",
[
(StreamingJsonDecoder, {}),
(JsonDecoder, {"data": [{"id": 1}, {"id": 2}]})
]
)
def test_pagination_decoder_decorator(requests_mock, decoder_class, expected):
decoder = PaginationDecoderDecorator(decoder=decoder_class(parameters={}))
response_body = "{\"data\": [{\"id\": 1}, {\"id\": 2}]}"
requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body)
response = requests.get("https://airbyte.io/")
assert next(decoder.decode(response)) == expected

View File

@@ -0,0 +1,38 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import pytest
import requests
from airbyte_cdk.sources.declarative.decoders import XmlDecoder
@pytest.mark.parametrize(
"response_body, expected",
[
(
"<item name=\"item_1\"></item>",
{"item": {"@name": "item_1"}}
),
(
"<data><item name=\"item_1\">Item 1</item><item name=\"item_2\">Item 2</item></data>",
{"data": {"item": [{"@name": "item_1", "#text": "Item 1"}, {"@name": "item_2", "#text": "Item 2"}]}}
),
(
None,
{}
),
(
"<item name=\"item_1\">",
{}
),
(
"<item xmlns:ns=\"https://airbyte.io\"><ns:id>1</ns:id><ns:name>Item 1</ns:name></item>",
{'item': {'@xmlns:ns': 'https://airbyte.io', 'ns:id': '1', 'ns:name': 'Item 1'}}
)
],
ids=["one_element_response", "multi_element_response", "empty_response", "malformed_xml_response", "xml_with_namespace_response"]
)
def test_xml_decoder(requests_mock, response_body, expected):
requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body)
response = requests.get("https://airbyte.io/")
assert next(XmlDecoder(parameters={}).decode(response)) == expected

View File

@@ -22,7 +22,7 @@ from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator
from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, PerPartitionCursor, ResumableFullRefreshCursor
@@ -121,7 +121,6 @@ decoder:
type: JsonDecoder
extractor:
type: DpathExtractor
decoder: "#/decoder"
selector:
type: RecordSelector
record_filter:
@@ -151,6 +150,8 @@ requester:
retriever:
paginator:
type: NoPagination
decoder:
$ref: "#/decoder"
partial_stream:
type: DeclarativeStream
schema_loader:
@@ -259,7 +260,7 @@ spec:
assert stream.retriever.record_selector.record_filter._filter_interpolator.condition == "{{ record['id'] > stream_state['id'] }}"
assert isinstance(stream.retriever.paginator, DefaultPaginator)
assert isinstance(stream.retriever.paginator.decoder, JsonDecoder)
assert isinstance(stream.retriever.paginator.decoder, PaginationDecoderDecorator)
assert stream.retriever.paginator.page_size_option.field_name.eval(input_config) == "page_size"
assert stream.retriever.paginator.page_size_option.inject_into == RequestOptionType.request_parameter
assert isinstance(stream.retriever.paginator.page_token_option, RequestPath)
@@ -267,7 +268,7 @@ spec:
assert stream.retriever.paginator.url_base.default == "https://api.sendgrid.com/v3/"
assert isinstance(stream.retriever.paginator.pagination_strategy, CursorPaginationStrategy)
assert isinstance(stream.retriever.paginator.pagination_strategy.decoder, JsonDecoder)
assert isinstance(stream.retriever.paginator.pagination_strategy.decoder, PaginationDecoderDecorator)
assert stream.retriever.paginator.pagination_strategy._cursor_value.string == "{{ response._metadata.next }}"
assert stream.retriever.paginator.pagination_strategy._cursor_value.default == "{{ response._metadata.next }}"
assert stream.retriever.paginator.pagination_strategy.page_size == 10
@@ -724,7 +725,6 @@ decoder:
type: JsonDecoder
extractor:
type: DpathExtractor
decoder: "#/decoder"
selector:
type: RecordSelector
record_filter:
@@ -754,6 +754,8 @@ requester:
retriever:
paginator:
type: NoPagination
decoder:
$ref: "#/decoder"
partial_stream:
type: DeclarativeStream
schema_loader:
@@ -828,7 +830,7 @@ spec:
assert isinstance(stream.retriever.cursor, ResumableFullRefreshCursor)
assert isinstance(stream.retriever.paginator, DefaultPaginator)
assert isinstance(stream.retriever.paginator.decoder, JsonDecoder)
assert isinstance(stream.retriever.paginator.decoder, PaginationDecoderDecorator)
assert stream.retriever.paginator.page_size_option.field_name.eval(input_config) == "page_size"
assert stream.retriever.paginator.page_size_option.inject_into == RequestOptionType.request_parameter
assert isinstance(stream.retriever.paginator.page_token_option, RequestPath)
@@ -836,7 +838,7 @@ spec:
assert stream.retriever.paginator.url_base.default == "https://api.sendgrid.com/v3/"
assert isinstance(stream.retriever.paginator.pagination_strategy, CursorPaginationStrategy)
assert isinstance(stream.retriever.paginator.pagination_strategy.decoder, JsonDecoder)
assert isinstance(stream.retriever.paginator.pagination_strategy.decoder, PaginationDecoderDecorator)
assert stream.retriever.paginator.pagination_strategy._cursor_value.string == "{{ response._metadata.next }}"
assert stream.retriever.paginator.pagination_strategy._cursor_value.default == "{{ response._metadata.next }}"
assert stream.retriever.paginator.pagination_strategy.page_size == 10

View File

@@ -7,7 +7,7 @@ from unittest.mock import MagicMock
import pytest
import requests
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, XmlDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import (
DefaultPaginator,
@@ -21,7 +21,7 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
@pytest.mark.parametrize(
"page_token_request_option, stop_condition, expected_updated_path, expected_request_params, expected_headers, expected_body_data, expected_body_json, last_record, expected_next_page_token, limit",
"page_token_request_option, stop_condition, expected_updated_path, expected_request_params, expected_headers, expected_body_data, expected_body_json, last_record, expected_next_page_token, limit, decoder, response_body",
[
(
RequestPath(parameters={}),
@@ -34,6 +34,8 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
{"id": 1},
{"next_page_token": "https://airbyte.io/next_url"},
2,
JsonDecoder,
{"next": "https://airbyte.io/next_url"},
),
(
RequestOption(inject_into=RequestOptionType.request_parameter, field_name="from", parameters={}),
@@ -46,6 +48,8 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
{"id": 1},
{"next_page_token": "https://airbyte.io/next_url"},
2,
JsonDecoder,
{"next": "https://airbyte.io/next_url"},
),
(
RequestOption(inject_into=RequestOptionType.request_parameter, field_name="from", parameters={}),
@@ -58,6 +62,8 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
{"id": 1},
None,
2,
JsonDecoder,
{"next": "https://airbyte.io/next_url"},
),
(
RequestOption(inject_into=RequestOptionType.header, field_name="from", parameters={}),
@@ -70,6 +76,8 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
{"id": 1},
{"next_page_token": "https://airbyte.io/next_url"},
2,
JsonDecoder,
{"next": "https://airbyte.io/next_url"},
),
(
RequestOption(inject_into=RequestOptionType.body_data, field_name="from", parameters={}),
@@ -82,6 +90,8 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
{"id": 1},
{"next_page_token": "https://airbyte.io/next_url"},
2,
JsonDecoder,
{"next": "https://airbyte.io/next_url"},
),
(
RequestOption(inject_into=RequestOptionType.body_json, field_name="from", parameters={}),
@@ -94,6 +104,36 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
{"id": 1},
{"next_page_token": "https://airbyte.io/next_url"},
2,
JsonDecoder,
{"next": "https://airbyte.io/next_url"},
),
(
RequestPath(parameters={}),
None,
"/next_url",
{"limit": 2},
{},
{},
{},
{"id": 1},
{"next_page_token": "https://airbyte.io/next_url"},
2,
XmlDecoder,
b"<next>https://airbyte.io/next_url</next>",
),
(
RequestOption(inject_into=RequestOptionType.request_parameter, field_name="from", parameters={}),
None,
None,
{"limit": 2, "from": "https://airbyte.io/next_url"},
{},
{},
{},
{"id": 1},
{"next_page_token": "https://airbyte.io/next_url"},
2,
XmlDecoder,
b"<next>https://airbyte.io/next_url</next>",
),
],
ids=[
@@ -103,6 +143,8 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
"test_default_paginator_cursor_header",
"test_default_paginator_cursor_body_data",
"test_default_paginator_cursor_body_json",
"test_default_paginator_path_with_xml_decoder",
"test_default_paginator_request_param_xml_decoder",
],
)
def test_default_paginator_with_cursor(
@@ -116,6 +158,8 @@ def test_default_paginator_with_cursor(
last_record,
expected_next_page_token,
limit,
decoder,
response_body
):
page_size_request_option = RequestOption(
inject_into=RequestOptionType.request_parameter, field_name="{{parameters['page_limit']}}", parameters={"page_limit": "limit"}
@@ -128,7 +172,7 @@ def test_default_paginator_with_cursor(
page_size=limit,
cursor_value=cursor_value,
stop_condition=stop_condition,
decoder=JsonDecoder(parameters={}),
decoder=decoder(parameters={}),
config=config,
parameters=parameters,
)
@@ -143,8 +187,7 @@ def test_default_paginator_with_cursor(
response = requests.Response()
response.headers = {"A_HEADER": "HEADER_VALUE"}
response_body = {"next": "https://airbyte.io/next_url"}
response._content = json.dumps(response_body).encode("utf-8")
response._content = json.dumps(response_body).encode("utf-8") if decoder == JsonDecoder else response_body
actual_next_page_token = paginator.next_page_token(response, 2, last_record)
actual_next_path = paginator.path()