diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index 99b88045d71..ed2cb26e14b 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -204,6 +204,9 @@ jobs: - name: Install Pyenv run: python3 -m pip install virtualenv==16.7.9 --user + - name: Install automake + run: apt-get install -y automake build-essential libtool libtool-bin autoconf + - name: Set up CI Gradle Properties run: | mkdir -p ~/.gradle/ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jq.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jq.py new file mode 100644 index 00000000000..0aefe1f7725 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jq.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import List + +import pyjq +import requests +from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.sources.declarative.extractors.http_extractor import HttpExtractor +from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.types import Record + + +class JqExtractor(HttpExtractor): + default_transform = "." + + def __init__(self, transform: str, decoder: Decoder, config, kwargs=None): + if kwargs is None: + kwargs = dict() + self._interpolator = JinjaInterpolation() + self._transform = transform + self._config = config + self._kwargs = kwargs + self._decoder = decoder + + def extract_records(self, response: requests.Response) -> List[Record]: + response_body = self._decoder.decode(response) + script = self._interpolator.eval(self._transform, self._config, default=self.default_transform, **{"kwargs": self._kwargs}) + return pyjq.all(script, response_body) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py new file mode 100644 index 00000000000..04cb266f345 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -0,0 +1,116 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Mapping, MutableMapping, Optional, Union + +import requests +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.request_headers.request_header_provider import RequestHeaderProvider +from airbyte_cdk.sources.declarative.requesters.request_params.request_parameters_provider import RequestParameterProvider +from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester +from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier +from airbyte_cdk.sources.declarative.types import Config +from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator + + +class HttpRequester(Requester): + def __init__( + self, + *, + name: str, + url_base: [str, InterpolatedString], + path: [str, InterpolatedString], + http_method: Union[str, HttpMethod], + request_parameters_provider: RequestParameterProvider, + request_headers_provider: RequestHeaderProvider, + authenticator: HttpAuthenticator, + retrier: Retrier, + config: Config, + ): + self._name = name + self._authenticator = authenticator + if type(url_base) == str: + url_base = InterpolatedString(url_base) + self._url_base = url_base + if type(path) == str: + path = InterpolatedString(path) + self._path: InterpolatedString = path + if type(http_method) == str: + http_method = HttpMethod[http_method] + self._method = http_method + self._request_parameters_provider = request_parameters_provider + self._request_headers_provider = request_headers_provider + self._retrier = retrier + self._config = config + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + return self._request_parameters_provider.request_params(stream_state, stream_slice, next_page_token) + + def get_authenticator(self): + return self._authenticator + + def get_url_base(self): + return self._url_base.eval(self._config) + + def get_path(self, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any], next_page_token: Mapping[str, Any]) -> str: + kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token} + path = self._path.eval(self._config, **kwargs) + return path + + def get_method(self): + return self._method + + @property + def raise_on_http_errors(self) -> bool: + # TODO this should be declarative + return True + + @property + def max_retries(self) -> Union[int, None]: + return self._retrier.max_retries + + @property + def retry_factor(self) -> float: + return self._retrier.retry_factor + + def should_retry(self, response: requests.Response) -> bool: + return self._retrier.should_retry(response) + + def backoff_time(self, response: requests.Response) -> Optional[float]: + return self._retrier.backoff_time(response) + + def request_headers( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Mapping[str, Any]: + return self._request_headers_provider.request_headers(stream_state, stream_slice, next_page_token) + + def request_body_data( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Optional[Union[Mapping, str]]: + # FIXME: this should be declarative + return dict() + + def request_body_json( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Optional[Mapping]: + # FIXME: this should be declarative + return dict() + + def request_kwargs( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Mapping[str, Any]: + # FIXME: this should be declarative + return dict() + + @property + def cache_filename(self) -> str: + # FIXME: this should be declarative + return f"{self._name}.yml" + + @property + def use_cache(self) -> bool: + # FIXME: this should be declarative + return False diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py new file mode 100644 index 00000000000..ea7a33fb66b --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, List, Mapping, Optional + +import requests +from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator +from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator + + +class NextPageUrlPaginator(Paginator): + def __init__(self, url_base: str = None, interpolated_paginator: InterpolatedPaginator = None, kwargs=None): + if kwargs is None: + kwargs = dict() + self._url_base = url_base or kwargs.get("url_base") + self._interpolated_paginator = interpolated_paginator or kwargs.get("interpolated_paginator") + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: + next_page_token = self._interpolated_paginator.next_page_token(response, last_records) + if next_page_token: + return {k: v.replace(self._url_base, "") for k, v in next_page_token.items() if v} + else: + return None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py new file mode 100644 index 00000000000..9e72e22c6d5 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py @@ -0,0 +1,31 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, List, Mapping, Optional + +import requests +from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.states.dict_state import DictState + + +class OffsetPaginator(Paginator): + def __init__(self, page_size: int, state: DictState, offset_key: str = "offset"): + self._limit = page_size + self._state: DictState = state + self._offsetKey = offset_key + self._update_state_with_offset(0) + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: + if len(last_records) < self._limit: + return None + offset = self._get_offset() + self._limit + token_map = {self._offsetKey: offset} + self._update_state_with_offset(offset) + return token_map + + def _update_state_with_offset(self, offset): + self._state.update_state(**{self._offsetKey: offset}) + + def _get_offset(self): + return self._state.get_state(self._offsetKey) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index b87d3c81a77..1ffe2546f85 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -240,4 +240,4 @@ class SimpleRetriever(Retriever, HttpStream): return self._iterator.stream_slices(sync_mode, stream_state) def get_state(self) -> MutableMapping[str, Any]: - return self._state.get_state() + return self._state.get_stream_state() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/dict_state.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/dict_state.py new file mode 100644 index 00000000000..b758bab152b --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/dict_state.py @@ -0,0 +1,72 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from enum import Enum +from typing import Mapping, Union + +from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.states.state import State + + +def _get_max(*, name, val, other_state): + other_val = other_state.get(name) + if other_val: + return max(val, other_val) + else: + return val + + +class StateType(Enum): + STR = str + INT = int + + +class DictState(State): + stream_state_field = "stream_state" + + def __init__(self, initial_mapping: Mapping[str, str] = None, state_type: Union[str, StateType, type] = "STR", config=None): + if initial_mapping is None: + initial_mapping = dict() + if config is None: + config = dict() + self._templates_to_evaluate = initial_mapping + if type(state_type) == str: + self._state_type = StateType[state_type].value + elif type(state_type) == StateType: + self._state_type = state_type.value + elif type(state_type) == type: + self._state_type = state_type + else: + raise Exception(f"Unexpected type for state_type. Got {state_type}") + self._interpolator = JinjaInterpolation() + self._context = dict() + self._config = config + + def update_state(self, **kwargs): + stream_state = kwargs.get(self.stream_state_field) + prev_stream_state = self.get_stream_state() or stream_state + self._context.update(**kwargs) + + self._context[self.stream_state_field] = self._compute_state(prev_stream_state) + + def get_state(self, state_field): + return self._context.get(state_field, {}) + + def get_stream_state(self): + return self.get_state(self.stream_state_field) + + def _compute_state(self, prev_state): + updated_state = { + self._interpolator.eval(name, self._config): self._interpolator.eval(value, self._config, **self._context) + for name, value in self._templates_to_evaluate.items() + } + updated_state = {name: self._state_type(value) for name, value in updated_state.items() if value} + + if prev_state: + next_state = {name: _get_max(name=name, val=value, other_state=prev_state) for name, value in updated_state.items()} + else: + next_state = updated_state + + self._context[self.stream_state_field] = next_state + return next_state diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py index dc711004924..8a7528c88b3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py @@ -12,7 +12,7 @@ class State(ABC): pass @abstractmethod - def get_state(self): + def get_stream_state(self): pass def deep_copy(self): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py new file mode 100644 index 00000000000..03cc7195dba --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -0,0 +1,100 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import datetime +import re +from typing import Any, Iterable, Mapping + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer + + +class DatetimeStreamSlicer(StreamSlicer): + timedelta_regex = re.compile( + r"((?P[\.\d]+?)w)?" + r"((?P[\.\d]+?)d)?" + r"((?P[\.\d]+?)h)?" + r"((?P[\.\d]+?)m)?" + r"((?P[\.\d]+?)s)?" + r"((?P[\.\d]+?)ms)?" + r"((?P[\.\d]+?)us)?$" + ) + + # FIXME: start_time, end_time, and step should be datetime and timedelta? + # FIXME: timezone should be declarative? + def __init__( + self, + start_time: InterpolatedString, + end_time: InterpolatedString, + step, + cursor_value: InterpolatedString, + datetime_format, + config, + ): + self._timezone = datetime.timezone.utc + self._interpolation = JinjaInterpolation() + self._datetime_format = datetime_format + self._start_time = self.parse_date(start_time.eval(config)) + self._end_time = self.parse_date(end_time.eval(config)) + self._end_time = min(self._end_time, datetime.datetime.now(tz=datetime.timezone.utc)) + self._start_time = min(self._start_time, self._end_time) + self._step = self._parse_timedelta(step) + self._config = config + self._cursor_value = cursor_value + + def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + stream_state = stream_state or {} + + cursor_value = self._cursor_value.eval(self._config, **{"stream_state": stream_state}) + start_date = self._get_date(self.parse_date(cursor_value), self._start_time, max) + if not self.is_start_date_valid(start_date): + self._end_time = start_date + return self._partition_daterange(start_date, self._end_time, self._step) + + def _partition_daterange(self, start, end, step: datetime.timedelta): + dates = [] + while start <= end: + end_date = self._get_date(start + step - datetime.timedelta(days=1), end, min) + dates.append({"start_date": start.strftime(self._datetime_format), "end_date": end_date.strftime(self._datetime_format)}) + start += step + return dates + + def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -> datetime.datetime: + cursor_date = self.parse_date(cursor_value or default_date) + return comparator(cursor_date, default_date) + + def parse_date(self, date: Any) -> datetime: + if date and isinstance(date, str): + if self.is_int(date): + return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone) + else: + return datetime.datetime.strptime(date, self._datetime_format).replace(tzinfo=self._timezone) + return date + + def is_start_date_valid(self, start_date: datetime) -> bool: + return start_date <= self._end_time + + def is_int(self, s) -> bool: + try: + int(s) + return True + except ValueError: + return False + + @classmethod + def _parse_timedelta(cls, time_str): + """ + Parse a time string e.g. (2h13m) into a timedelta object. + Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699 + :param time_str: A string identifying a duration. (eg. 2h13m) + :return datetime.timedelta: A datetime.timedelta object + """ + parts = cls.timedelta_regex.match(time_str) + + assert parts is not None + + time_params = {name: float(param) for name, param in parts.groupdict().items() if param} + return datetime.timedelta(**time_params) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 8722fef3917..ca4b9b55f1b 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -55,6 +55,7 @@ setup( "vcrpy", "Deprecated~=1.2", "Jinja2~=3.1.2", + "pyjq~=2.5.2", ], python_requires=">=3.9", extras_require={ diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_mapping.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_mapping.py new file mode 100644 index 00000000000..1c6022eb1be --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_mapping.py @@ -0,0 +1,23 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping + + +def test(): + d = { + "field": "value", + "field_to_interpolate_from_config": "{{ config['c'] }}", + "field_to_interpolate_from_kwargs": "{{ kwargs['a'] }}", + } + config = {"c": "VALUE_FROM_CONFIG"} + kwargs = {"a": "VALUE_FROM_KWARGS"} + mapping = InterpolatedMapping(d) + + interpolated = mapping.eval(config, **{"kwargs": kwargs}) + + assert interpolated["field"] == "value" + assert interpolated["field_to_interpolate_from_config"] == "VALUE_FROM_CONFIG" + assert interpolated["field_to_interpolate_from_kwargs"] == "VALUE_FROM_KWARGS" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_string.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_string.py new file mode 100644 index 00000000000..8207c253a74 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_string.py @@ -0,0 +1,26 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString + +config = {"field": "value"} + + +def test_static_value(): + static_value = "HELLO WORLD" + s = InterpolatedString(static_value) + assert s.eval(config) == "HELLO WORLD" + + +def test_eval_from_config(): + string = "{{ config['field'] }}" + s = InterpolatedString(string) + assert s.eval(config) == "value" + + +def test_eval_from_kwargs(): + string = "{{ kwargs['c'] }}" + kwargs = {"c": "airbyte"} + s = InterpolatedString(string) + assert s.eval(config, **{"kwargs": kwargs}) == "airbyte" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_jinja.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_jinja.py new file mode 100644 index 00000000000..30bd491d253 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_jinja.py @@ -0,0 +1,31 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation + + +def test_get_value_from_config(): + interpolation = JinjaInterpolation() + s = "{{ config['date'] }}" + config = {"date": "2022-01-01"} + val = interpolation.eval(s, config) + assert val == "2022-01-01" + + +def test_get_value_from_stream_slice(): + interpolation = JinjaInterpolation() + s = "{{ stream_slice['date'] }}" + config = {"date": "2022-01-01"} + stream_slice = {"date": "2020-09-09"} + val = interpolation.eval(s, config, **{"stream_slice": stream_slice}) + assert val == "2020-09-09" + + +def test_get_value_from_a_list_of_mappings(): + interpolation = JinjaInterpolation() + s = "{{ records[0]['date'] }}" + config = {"date": "2022-01-01"} + records = [{"date": "2020-09-09"}] + val = interpolation.eval(s, config, **{"records": records}) + assert val == "2020-09-09" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_request_parameter_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_request_parameter_provider.py new file mode 100644 index 00000000000..1699a62a949 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_request_parameter_provider.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.sources.declarative.requesters.request_params.interpolated_request_parameter_provider import ( + InterpolatedRequestParameterProvider, +) + +state = {"date": "2021-01-01"} +stream_slice = {"start_date": "2020-01-01"} +next_page_token = {"offset": "12345"} +config = {"option": "OPTION"} + + +def test(): + request_parameters = {"a_static_request_param": "a_static_value"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_parameters == request_params + + +def test_value_depends_on_state(): + request_parameters = {"a_static_request_param": "{{ stream_state['date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == state["date"] + + +def test_value_depends_on_stream_slice(): + request_parameters = {"a_static_request_param": "{{ stream_slice['start_date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == stream_slice["start_date"] + + +def test_value_depends_on_next_page_token(): + request_parameters = {"a_static_request_param": "{{ next_page_token['offset'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == next_page_token["offset"] + + +def test_value_depends_on_config(): + request_parameters = {"a_static_request_param": "{{ config['option'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == config["option"] + + +def test_parameter_is_interpolated(): + request_parameters = { + "{{ stream_state['date'] }} - {{stream_slice['start_date']}} - {{next_page_token['offset']}} - {{config['option']}}": "ABC" + } + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params[f"{state['date']} - {stream_slice['start_date']} - {next_page_token['offset']} - {config['option']}"] == "ABC" + + +def test_none_value(): + request_parameters = {"a_static_request_param": "{{ stream_state['date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params({}, stream_slice, next_page_token) + + assert len(request_params) == 0 diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/iterators/test_interpolated_request_parameter_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/iterators/test_interpolated_request_parameter_provider.py new file mode 100644 index 00000000000..eff1dd651d4 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/iterators/test_interpolated_request_parameter_provider.py @@ -0,0 +1,77 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.requesters.request_params.interpolated_request_parameter_provider import ( + InterpolatedRequestParameterProvider, +) + +state = {"date": "2021-01-01"} +stream_slice = {"start_date": "2020-01-01"} +next_page_token = {"offset": "12345"} +config = {"option": "OPTION"} + + +def test(): + request_parameters = {"a_static_request_param": "a_static_value"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_parameters == request_params + + +def test_value_depends_on_state(): + request_parameters = {"a_static_request_param": "{{ stream_state['date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == state["date"] + + +def test_value_depends_on_stream_slice(): + request_parameters = {"a_static_request_param": "{{ stream_slice['start_date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == stream_slice["start_date"] + + +def test_value_depends_on_next_page_token(): + request_parameters = {"a_static_request_param": "{{ next_page_token['offset'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == next_page_token["offset"] + + +def test_value_depends_on_config(): + request_parameters = {"a_static_request_param": "{{ config['option'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == config["option"] + + +def test_parameter_is_interpolated(): + request_parameters = { + "{{ stream_state['date'] }} - {{stream_slice['start_date']}} - {{next_page_token['offset']}} - {{config['option']}}": "ABC" + } + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params[f"{state['date']} - {stream_slice['start_date']} - {next_page_token['offset']} - {config['option']}"] == "ABC" + + +def test_none_value(): + request_parameters = {"a_static_request_param": "{{ stream_state['date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params({}, stream_slice, next_page_token) + + assert len(request_params) == 0 diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py new file mode 100644 index 00000000000..0af6dd0cf39 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py @@ -0,0 +1,42 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json + +import requests +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator +from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator + +config = {"option": "OPTION"} +response = requests.Response() +response.headers = {"A_HEADER": "HEADER_VALUE"} +response_body = {"_metadata": {"next": "https://airbyte.io/next_url"}} +response._content = json.dumps(response_body).encode("utf-8") +last_responses = [{"id": 0}] +decoder = JsonDecoder() + + +def test_value_depends_response_body(): + next_page_tokens = {"next_page_url": "{{ decoded_response['_metadata']['next'] }}"} + paginator = create_paginator(next_page_tokens) + + next_page_token = paginator.next_page_token(response, last_responses) + + assert next_page_token == {"next_page_url": "next_url"} + + +def test_no_next_page_found(): + next_page_tokens = {"next_page_url": "{{ decoded_response['_metadata']['next'] }}"} + paginator = create_paginator(next_page_tokens) + + r = requests.Response() + r._content = json.dumps({"data": []}).encode("utf-8") + next_page_token = paginator.next_page_token(r, last_responses) + + assert next_page_token is None + + +def create_paginator(template): + return NextPageUrlPaginator("https://airbyte.io/", InterpolatedPaginator(template, decoder, config)) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_paginator.py new file mode 100644 index 00000000000..9d4ecef26b7 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_paginator.py @@ -0,0 +1,47 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import requests +from airbyte_cdk.sources.declarative.requesters.paginators.offset_paginator import OffsetPaginator +from airbyte_cdk.sources.declarative.states.dict_state import DictState + +response = requests.Response() + +tag = "cursor" +last_responses = [{"id": 0}, {"id": 1}] +state = DictState() + + +def test_return_none_if_fewer_records_than_limit(): + limit = 5 + paginator = OffsetPaginator(limit, state, tag) + + assert paginator._get_offset() == 0 + + next_page_token = paginator.next_page_token(response, last_responses) + + assert next_page_token is None + + +def test_return_next_offset_limit_1(): + limit = 1 + paginator = OffsetPaginator(limit, state, tag) + + next_page_token = paginator.next_page_token(response, last_responses) + + assert next_page_token == {tag: 1} + assert paginator._get_offset() == 1 + + +def test_return_next_offset_limit_2(): + limit = 2 + paginator = OffsetPaginator(limit, state, tag) + + next_page_token = paginator.next_page_token(response, last_responses) + + assert next_page_token == {tag: 2} + assert paginator._get_offset() == 2 + + next_page_token = paginator.next_page_token(response, [{"id": 2}]) + assert next_page_token is None diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_dict_state.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_dict_state.py new file mode 100644 index 00000000000..639ed8280b9 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_dict_state.py @@ -0,0 +1,92 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.states.dict_state import DictState, StateType + +config = {"name": "date"} +name = "{{ config['name'] }}" +value = "{{ last_record['updated_at'] }}" +dict_mapping = { + name: value, +} + + +def test_empty_state_is_none(): + state = DictState(dict_mapping, "INT", config) + initial_state = state.get_stream_state() + expected_state = {} + assert expected_state == initial_state + + +def test_state_type(): + state_type_string = DictState(dict_mapping, "INT", config) + state_type_type = DictState(dict_mapping, int, config) + state_type_enum = DictState(dict_mapping, StateType.INT, config) + assert state_type_string._state_type == state_type_type._state_type == state_type_enum._state_type + + +def test_update_initial_state(): + state = DictState(dict_mapping, "STR", config) + stream_slice = None + stream_state = None + last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"} + last_record = {"id": "1234", "updated_at": "2021-01-01"} + state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) + actual_state = state.get_stream_state() + expected_state = {"date": "2021-01-01"} + assert expected_state == actual_state + + +def test_update_state_with_recent_cursor(): + state = DictState(dict_mapping, "STR", config) + stream_slice = None + stream_state = {"date": "2020-12-31"} + last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"} + last_record = {"id": "1234", "updated_at": "2021-01-01"} + state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) + actual_state = state.get_stream_state() + expected_state = {"date": "2021-01-01"} + assert expected_state == actual_state + + +def test_update_state_with_old_cursor(): + state = DictState(dict_mapping, "STR", config) + stream_slice = None + stream_state = {"date": "2021-01-02"} + last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"} + last_record = {"id": "1234", "updated_at": "2021-01-01"} + state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) + actual_state = state.get_stream_state() + expected_state = {"date": "2021-01-02"} + assert expected_state == actual_state + + +def test_update_state_with_older_state(): + state = DictState(dict_mapping, "STR", config) + stream_slice = None + stream_state = {"date": "2021-01-02"} + last_response = {"data": {"id": "1234", "updated_at": "2021-01-02"}, "last_refresh": "2020-01-01"} + last_record = {"id": "1234", "updated_at": "2021-01-02"} + state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) + actual_state = state.get_stream_state() + expected_state = {"date": "2021-01-02"} + + out_of_order_response = {"data": {"id": "1234", "updated_at": "2021-01-02"}, "last_refresh": "2020-01-01"} + out_of_order_record = {"id": "1234", "updated_at": "2021-01-01"} + state.update_state( + stream_slice=stream_slice, stream_state=stream_state, last_response=out_of_order_response, last_record=out_of_order_record + ) + assert expected_state == actual_state + + +def test_state_is_a_timestamp(): + state = DictState(dict_mapping, "INT", config) + stream_slice = None + stream_state = {"date": 12345} + last_response = {"data": {"id": "1234", "updated_at": 123456}, "last_refresh": "2020-01-01"} + last_record = {"id": "1234", "updated_at": 123456} + state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) + actual_state = state.get_stream_state() + expected_state = {"date": 123456} + assert expected_state == actual_state diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py new file mode 100644 index 00000000000..a9891445c8a --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -0,0 +1,56 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +import requests +from airbyte_cdk.sources.declarative.requesters.http_requester import HttpMethod, HttpRequester + + +def test(): + http_method = "GET" + + request_parameters_provider = MagicMock() + request_params = {"param": "value"} + request_parameters_provider.request_params.return_value = request_params + + request_headers_provider = MagicMock() + request_headers = {"header": "value"} + request_headers_provider.request_headers.return_value = request_headers + + authenticator = MagicMock() + + retrier = MagicMock() + max_retries = 10 + should_retry = True + backoff_time = 1000 + retrier.max_retries = max_retries + retrier.should_retry.return_value = should_retry + retrier.backoff_time.return_value = backoff_time + + config = {"url": "https://airbyte.io"} + stream_slice = {"id": "1234"} + + name = "stream_name" + + requester = HttpRequester( + name=name, + url_base="{{ config['url'] }}", + path="v1/{{ stream_slice['id'] }}", + http_method=http_method, + request_parameters_provider=request_parameters_provider, + request_headers_provider=request_headers_provider, + authenticator=authenticator, + retrier=retrier, + config=config, + ) + + assert requester.get_url_base() == "https://airbyte.io" + assert requester.get_path(stream_state=None, stream_slice=stream_slice, next_page_token=None) == "v1/1234" + assert requester.get_authenticator() == authenticator + assert requester.get_method() == HttpMethod.GET + assert requester.request_params(stream_state=None, stream_slice=None, next_page_token=None) == request_params + assert requester.max_retries == max_retries + assert requester.should_retry(requests.Response()) == should_retry + assert requester.backoff_time(requests.Response()) == backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_interpolated_request_parameter_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_interpolated_request_parameter_provider.py new file mode 100644 index 00000000000..1699a62a949 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_interpolated_request_parameter_provider.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.sources.declarative.requesters.request_params.interpolated_request_parameter_provider import ( + InterpolatedRequestParameterProvider, +) + +state = {"date": "2021-01-01"} +stream_slice = {"start_date": "2020-01-01"} +next_page_token = {"offset": "12345"} +config = {"option": "OPTION"} + + +def test(): + request_parameters = {"a_static_request_param": "a_static_value"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_parameters == request_params + + +def test_value_depends_on_state(): + request_parameters = {"a_static_request_param": "{{ stream_state['date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == state["date"] + + +def test_value_depends_on_stream_slice(): + request_parameters = {"a_static_request_param": "{{ stream_slice['start_date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == stream_slice["start_date"] + + +def test_value_depends_on_next_page_token(): + request_parameters = {"a_static_request_param": "{{ next_page_token['offset'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == next_page_token["offset"] + + +def test_value_depends_on_config(): + request_parameters = {"a_static_request_param": "{{ config['option'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params["a_static_request_param"] == config["option"] + + +def test_parameter_is_interpolated(): + request_parameters = { + "{{ stream_state['date'] }} - {{stream_slice['start_date']}} - {{next_page_token['offset']}} - {{config['option']}}": "ABC" + } + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params(state, stream_slice, next_page_token) + + assert request_params[f"{state['date']} - {stream_slice['start_date']} - {next_page_token['offset']} - {config['option']}"] == "ABC" + + +def test_none_value(): + request_parameters = {"a_static_request_param": "{{ stream_state['date'] }}"} + provider = InterpolatedRequestParameterProvider(request_parameters=request_parameters, config=config) + + request_params = provider.request_params({}, stream_slice, next_page_token) + + assert len(request_params) == 0 diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index d5cb6b0629f..9a92bf9ce53 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -33,7 +33,7 @@ def test(): state = MagicMock() underlying_state = {"date": "2021-01-01"} - state.get_state.return_value = underlying_state + state.get_stream_state.return_value = underlying_state url_base = "https://airbyte.io" requester.get_url_base.return_value = url_base diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/__init__.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/__init__.py new file mode 100644 index 00000000000..46b7376756e --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py new file mode 100644 index 00000000000..4f20ab3859d --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -0,0 +1,138 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import datetime +import unittest + +import pytest +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer + +FAKE_NOW = datetime.datetime(2022, 1, 1, tzinfo=datetime.timezone.utc) + +config = {"start_date": "2021-01-01"} +start_date = InterpolatedString("{{ stream_state['date'] }}", "{{ config['start_date'] }}") +end_date_now = InterpolatedString( + "{{ today_utc() }}", +) +end_date = InterpolatedString("2021-01-10") +cursor_value = InterpolatedString("{{ stream_state['date'] }}") +timezone = datetime.timezone.utc + +datetime_format = "%Y-%m-%d" + + +@pytest.fixture() +def mock_datetime_now(monkeypatch): + datetime_mock = unittest.mock.MagicMock(wraps=datetime.datetime) + datetime_mock.now.return_value = FAKE_NOW + monkeypatch.setattr(datetime, "datetime", datetime_mock) + + +def test_stream_slices_1_day(mock_datetime_now): + stream_state = None + + expected_slices = [ + {"start_date": "2021-01-01", "end_date": "2021-01-01"}, + {"start_date": "2021-01-02", "end_date": "2021-01-02"}, + {"start_date": "2021-01-03", "end_date": "2021-01-03"}, + {"start_date": "2021-01-04", "end_date": "2021-01-04"}, + {"start_date": "2021-01-05", "end_date": "2021-01-05"}, + {"start_date": "2021-01-06", "end_date": "2021-01-06"}, + {"start_date": "2021-01-07", "end_date": "2021-01-07"}, + {"start_date": "2021-01-08", "end_date": "2021-01-08"}, + {"start_date": "2021-01-09", "end_date": "2021-01-09"}, + {"start_date": "2021-01-10", "end_date": "2021-01-10"}, + ] + step = "1d" + slicer = DatetimeStreamSlicer(start_date, end_date, step, cursor_value, datetime_format, config) + stream_slices = slicer.stream_slices(SyncMode.incremental, stream_state) + + assert expected_slices == stream_slices + + +def test_stream_slices_2_days(mock_datetime_now): + stream_state = None + + expected_slices = [ + {"start_date": "2021-01-01", "end_date": "2021-01-02"}, + {"start_date": "2021-01-03", "end_date": "2021-01-04"}, + {"start_date": "2021-01-05", "end_date": "2021-01-06"}, + {"start_date": "2021-01-07", "end_date": "2021-01-08"}, + {"start_date": "2021-01-09", "end_date": "2021-01-10"}, + ] + step = "2d" + slicer = DatetimeStreamSlicer(start_date, end_date, step, cursor_value, datetime_format, config) + stream_slices = slicer.stream_slices(SyncMode.incremental, stream_state) + + assert expected_slices == stream_slices + + +def test_stream_slices_from_stream_state(mock_datetime_now): + stream_state = {"date": "2021-01-05"} + + expected_slices = [ + # FIXME: should this include 2021-01-05? + {"start_date": "2021-01-05", "end_date": "2021-01-05"}, + {"start_date": "2021-01-06", "end_date": "2021-01-06"}, + {"start_date": "2021-01-07", "end_date": "2021-01-07"}, + {"start_date": "2021-01-08", "end_date": "2021-01-08"}, + {"start_date": "2021-01-09", "end_date": "2021-01-09"}, + {"start_date": "2021-01-10", "end_date": "2021-01-10"}, + ] + step = "1d" + slicer = DatetimeStreamSlicer(start_date, end_date, step, cursor_value, datetime_format, config) + stream_slices = slicer.stream_slices(SyncMode.incremental, stream_state) + + assert expected_slices == stream_slices + + +def test_stream_slices_12_days(mock_datetime_now): + stream_state = None + + expected_slices = [ + {"start_date": "2021-01-01", "end_date": "2021-01-10"}, + ] + step = "12d" + slicer = DatetimeStreamSlicer(start_date, end_date, step, cursor_value, datetime_format, config) + stream_slices = slicer.stream_slices(SyncMode.incremental, stream_state) + + assert expected_slices == stream_slices + + +def test_init_from_config(mock_datetime_now): + step = "1d" + + slicer = DatetimeStreamSlicer(start_date, end_date_now, step, cursor_value, datetime_format, config) + assert datetime.datetime(2021, 1, 1, tzinfo=timezone) == slicer._start_time + assert FAKE_NOW == slicer._end_time + assert datetime.timedelta(days=1) == slicer._step + assert datetime.timezone.utc == slicer._timezone + assert datetime_format == slicer._datetime_format + + +def test_end_date_past_now(mock_datetime_now): + step = "1d" + invalid_end_date = InterpolatedString( + f"{(FAKE_NOW + datetime.timedelta(days=1)).strftime(datetime_format)}", + ) + slicer = DatetimeStreamSlicer(start_date, invalid_end_date, step, cursor_value, datetime_format, config) + + assert slicer._end_time != invalid_end_date + assert slicer._end_time == datetime.datetime.now() + + +def test_start_date_after_end_date(): + step = "1d" + invalid_start_date = InterpolatedString("2021-01-11") + slicer = DatetimeStreamSlicer(invalid_start_date, end_date, step, cursor_value, datetime_format, config) + + assert slicer._start_time != invalid_start_date + assert slicer._start_time == slicer._end_time + assert slicer._start_time == datetime.datetime(2021, 1, 10, tzinfo=datetime.timezone.utc) + + +if __name__ == "__main__": + unittest.main() diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_jq.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_jq.py new file mode 100644 index 00000000000..51fe7dd85cd --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_jq.py @@ -0,0 +1,66 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json + +import requests +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.extractors.jq import JqExtractor + +config = {"field": "record_array"} +decoder = JsonDecoder() + + +def test(): + transform = ".data[]" + extractor = JqExtractor(transform, decoder, config) + + records = [{"id": 1}, {"id": 2}] + body = {"data": records} + response = create_response(body) + actual_records = extractor.extract_records(response) + + assert actual_records == records + + +def test_field_in_config(): + transform = ".{{ config['field'] }}[]" + extractor = JqExtractor(transform, decoder, config) + + records = [{"id": 1}, {"id": 2}] + body = {"record_array": records} + response = create_response(body) + actual_records = extractor.extract_records(response) + + assert actual_records == records + + +def test_field_in_kwargs(): + transform = ".{{ kwargs['data_field'] }}[]" + kwargs = {"data_field": "records"} + extractor = JqExtractor(transform, decoder, config, kwargs=kwargs) + + records = [{"id": 1}, {"id": 2}] + body = {"records": records} + response = create_response(body) + actual_records = extractor.extract_records(response) + + assert actual_records == records + + +def create_response(body): + response = requests.Response() + response._content = json.dumps(body).encode("utf-8") + return response + + +def test_default(): + transform = ".{{kwargs['field']}}[]" + extractor = JqExtractor(transform, decoder, config) + + records = [{"id": 1}, {"id": 2}] + response = create_response(records) + actual_records = extractor.extract_records(response) + + assert actual_records == records