1
0
mirror of synced 2026-01-05 21:02:13 -05:00
Files
airbyte/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py
Alexandre Girard c98f196d64 [low-code connectors] Rename decode_response reference to response (#14877)
* checkout files from test branch

* read_incremental works

* reset to master

* remove dead code

* comment

* fix

* Add test

* comments

* utc

* format

* small fix

* Add test with rfc3339

* remove unused param

* fix test

* configurable state checkpointing

* update test

* start working on retrier

* retry predicate

* return response status

* look in error message

* cleanup test

* constant backoff strategy

* chain backoff strategy

* chain retrier

* Add to class types registry

* extract backoff time from header

* wait until

* update

* split file

* parse_records

* classmethod

* delete dead code

* comment

* comment

* comments

* fix

* test for instantiating chain retrier

* fix parsing

* cleanup

* fix

* reset

* never raise on http error

* remove print

* comment

* comment

* comment

* comment

* remove prints

* add declarative stream to registry

* start working on limit paginator

* support for offset pagination

* tests

* move limit value

* extract request option

* boilerplate

* page increment

* delete offset paginator

* update conditional paginator

* refactor and fix test

* fix test

* small fix

* Delete dead code

* Add docstrings

* quick fix

* exponential backoff

* fix test

* fix

* delete unused properties

* fix

* missing unit tests

* uppercase

* docstrings

* rename to success

* compare full request instead of just url

* renmae module

* rename test file

* rename interface

* rename default retrier

* rename to compositeerrorhandler

* fix missing renames

* move action to filter

* str -> minmaxdatetime

* small fixes

* plural

* add example

* handle header variations

* also fix wait time from

* allow using a regex to extract the value

* group()

* docstring

* add docs

* update comment

* docstrings

* fix tests

* rename param

* cleanup stop_condition

* cleanup

* Add examples

* interpolated pagination strategy

* dont need duplicate class

* docstrings

* more docstrings

* docstrings

* update comment

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* version: Update Parquet library to latest release (#14502)

The upstream Parquet library that is currently pinned for use in the S3 destination plugin is over a year old. The current version is generating invalid schemas for date-time with time-zone fields which appears to be addressed in the `1.12.3` release of the library in commit c72862b613

* merge

* 🎉 Source Github: improve schema for stream `pull_request_commits` added "null" (#14613)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>

* Docs: Fixed broken links (#14622)

* fixing broken links

* more broken links

* source-hubspot: change mentioning of Mailchimp into HubSpot  doc (#14620)

* Helm Chart: Add external temporal option (#14597)

* conflict env configmap and chart lock

* reverting lock

* add eof lines and documentation on values yaml

* conflict json file

* rollback json

* solve conflict

* correct minio with new version

Co-authored-by: Guy Feldman <gfeldman@86labs.com>

* 🎉 Add YAML format to source-file reader (#14588)

* Add yaml reader

* Update docs

* Bumpversion of connector

* bump docs

* Update pyarrow dependency

* Upgrade pandas dependency

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* 🎉 Source Okta: add GroupMembers stream (#14380)

* add Group_Members stream to okta source

- Group_Members return a list of users, the same schema of Users stream.
- Create a shared schema users, and both group_members and users sechema use it as a reference.
- Add Group_Members stream to source connector

* add tests and fix logs schema

- fix the test error: None is not one of enums though the enum type includes both string and null, it comes from json schema validator
ddb87afad8/jsonschema/_validators.py (L279-L285)
- change grouop_members to use id as the cursor field since `filter` is not supported in the query string
- fix the abnormal state test on logs stream, when since is abnormally large, until has to defined, an equal or a larger value
- remove logs stream from full sync test, because 2 full sync always has a gap -- at least a new log about users or groups api.

* last polish before submit the PR

- bump docker version
- update changelog
- add the right abnormal value for logs stream
- correct the sample catalog

* address comments::

- improve comments for until parameter under the logs stream
- add use_cache on groupMembers

* add use_cache to Group_Members

* change configured_catalog to test

* auto-bump connector version

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* split test files

* renames

* missing unit test

* add missing unit tests

* rename

* assert isinstance

* start extracting to their own files

* use final instead of classmethod

* assert we retry 429 errors

* Add log

* replace asserts with valueexceptions

* delete superfluous print statement

* fix factory so we don't need to union everything with strings

* get class_name from type

* remove from class types registry

* process error handlers one at a time

* sort

* delete print statement

* comment

* comment

* format

* delete unused file

* comment

* interpolatedboolean

* comment

* not optional

* not optional

* unit tests

* fix request body data

* add test

* move file to right module

* update

* reset to master

* format

* rename to pass_by

* rename to page size

* fix

* add test

* fix body data

* delete extra newlines

* move to subpackage

* fix imports

* handle str body data

* simplify

* fix typing

* always return a map

* rename to inject_into

* only accept enum

* delete conditional paginator

* only return body data

* rename decoded response to response

* decoded_response -> response

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: Tobias Macey <tmacey@boundlessnotions.com>
Co-authored-by: Serhii Chvaliuk <grubberr@gmail.com>
Co-authored-by: Amruta Ranade <11484018+Amruta-Ranade@users.noreply.github.com>
Co-authored-by: Bas Beelen <bjgbeelen@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Guy Feldman <gfeldman@86labs.com>
Co-authored-by: Christophe Duong <christophe.duong@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Yiyang Li <yiyangli2010@gmail.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
2022-07-21 02:58:22 -07:00

468 lines
19 KiB
Python

#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import datetime
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.limit_paginator import LimitPaginator
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator
factory = DeclarativeComponentFactory()
parser = YamlParser()
input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]}
def test_factory():
content = """
limit: 50
offset_request_parameters:
offset: "{{ next_page_token['offset'] }}"
limit: "*ref(limit)"
request_options:
class_name: airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider.InterpolatedRequestOptionsProvider
request_parameters: "*ref(offset_request_parameters)"
request_body_json:
body_offset: "{{ next_page_token['offset'] }}"
"""
config = parser.parse(content)
request_options_provider = factory.create_component(config["request_options"], input_config)()
assert type(request_options_provider) == InterpolatedRequestOptionsProvider
assert request_options_provider._parameter_interpolator._config == input_config
assert request_options_provider._parameter_interpolator._interpolator._mapping["offset"] == "{{ next_page_token['offset'] }}"
assert request_options_provider._body_json_interpolator._config == input_config
assert request_options_provider._body_json_interpolator._interpolator._mapping["body_offset"] == "{{ next_page_token['offset'] }}"
def test_interpolate_config():
content = """
authenticator:
class_name: airbyte_cdk.sources.declarative.auth.oauth.DeclarativeOauth2Authenticator
client_id: "some_client_id"
client_secret: "some_client_secret"
token_refresh_endpoint: "https://api.sendgrid.com/v3/auth"
refresh_token: "{{ config['apikey'] }}"
refresh_request_body:
body_field: "yoyoyo"
interpolated_body_field: "{{ config['apikey'] }}"
"""
config = parser.parse(content)
authenticator = factory.create_component(config["authenticator"], input_config)()
assert authenticator._client_id._string == "some_client_id"
assert authenticator._client_secret._string == "some_client_secret"
assert authenticator._token_refresh_endpoint._string == "https://api.sendgrid.com/v3/auth"
assert authenticator._refresh_token._string == "verysecrettoken"
assert authenticator._refresh_request_body._mapping == {"body_field": "yoyoyo", "interpolated_body_field": "{{ config['apikey'] }}"}
def test_list_based_stream_slicer_with_values_refd():
content = """
repositories: ["airbyte", "airbyte-cloud"]
stream_slicer:
class_name: airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer.ListStreamSlicer
slice_values: "*ref(repositories)"
slice_definition:
repository: "{{ slice_value }}"
"""
config = parser.parse(content)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
assert ["airbyte", "airbyte-cloud"] == stream_slicer._slice_values
def test_list_based_stream_slicer_with_values_defined_in_config():
content = """
stream_slicer:
class_name: airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer.ListStreamSlicer
slice_values: "{{config['repos']}}"
slice_definition:
repository: "{{ slice_value }}"
"""
config = parser.parse(content)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
assert ["airbyte", "airbyte-cloud"] == stream_slicer._slice_values
def test_datetime_stream_slicer():
content = """
stream_slicer:
type: DatetimeStreamSlicer
options:
datetime_format: "%Y-%m-%d"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config['start_time'] }}"
min_datetime: "{{ config['start_time'] + day_delta(2) }}"
end_datetime: "{{ config['end_time'] }}"
step: "10d"
cursor_value: "created"
lookback_window: "5d"
"""
config = parser.parse(content)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
assert type(stream_slicer) == DatetimeStreamSlicer
assert stream_slicer._timezone == datetime.timezone.utc
assert type(stream_slicer._start_datetime) == MinMaxDatetime
assert type(stream_slicer._end_datetime) == MinMaxDatetime
assert stream_slicer._start_datetime._datetime_format == "%Y-%m-%d"
assert stream_slicer._start_datetime._timezone == datetime.timezone.utc
assert stream_slicer._start_datetime._datetime_interpolator._string == "{{ config['start_time'] }}"
assert stream_slicer._start_datetime._min_datetime_interpolator._string == "{{ config['start_time'] + day_delta(2) }}"
assert stream_slicer._end_datetime._datetime_interpolator._string == "{{ config['end_time'] }}"
assert stream_slicer._step == datetime.timedelta(days=10)
assert stream_slicer._cursor_value._string == "created"
assert stream_slicer._lookback_window._string == "5d"
def test_full_config():
content = """
decoder:
class_name: "airbyte_cdk.sources.declarative.decoders.json_decoder.JsonDecoder"
extractor:
class_name: airbyte_cdk.sources.declarative.extractors.jello.JelloExtractor
decoder: "*ref(decoder)"
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
record_filter:
class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter
condition: "{{ record['id'] > stream_state['id'] }}"
metadata_paginator:
type: "LimitPaginator"
page_size: 10
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
url_base: "https://api.sendgrid.com/v3/"
next_page_url_from_token_partial:
class_name: "airbyte_cdk.sources.declarative.interpolation.interpolated_string.InterpolatedString"
string: "{{ next_page_token['next_page_url'] }}"
request_options_provider:
class_name: airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider.InterpolatedRequestOptionsProvider
requester:
class_name: airbyte_cdk.sources.declarative.requesters.http_requester.HttpRequester
name: "{{ options['name'] }}"
url_base: "https://api.sendgrid.com/v3/"
http_method: "GET"
authenticator:
class_name: airbyte_cdk.sources.streams.http.requests_native_auth.token.TokenAuthenticator
token: "{{ config['apikey'] }}"
request_parameters_provider: "*ref(request_options_provider)"
error_handler:
type: DefaultErrorHandler
retriever:
class_name: "airbyte_cdk.sources.declarative.retrievers.simple_retriever.SimpleRetriever"
name: "{{ options['name'] }}"
state:
class_name: airbyte_cdk.sources.declarative.states.dict_state.DictState
stream_slicer:
class_name: airbyte_cdk.sources.declarative.stream_slicers.single_slice.SingleSlice
paginator:
class_name: airbyte_cdk.sources.declarative.requesters.paginators.no_pagination.NoPagination
primary_key: "{{ options['primary_key'] }}"
partial_stream:
class_name: "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
schema_loader:
class_name: airbyte_cdk.sources.declarative.schema.json_schema.JsonSchema
file_path: "./source_sendgrid/schemas/{{ name }}.json"
cursor_field: [ ]
list_stream:
ref: "*ref(partial_stream)"
options:
name: "lists"
primary_key: "id"
extractor:
ref: "*ref(extractor)"
transform: ".result[]"
retriever:
ref: "*ref(retriever)"
requester:
ref: "*ref(requester)"
path:
ref: "*ref(next_page_url_from_token_partial)"
default: "marketing/lists"
paginator:
ref: "*ref(metadata_paginator)"
record_selector:
ref: "*ref(selector)"
check:
class_name: airbyte_cdk.sources.declarative.checks.check_stream.CheckStream
stream_names: ["list_stream"]
"""
config = parser.parse(content)
stream_config = config["list_stream"]
assert stream_config["class_name"] == "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
assert stream_config["cursor_field"] == []
stream = factory.create_component(stream_config, input_config)()
assert isinstance(stream._retriever._record_selector._extractor, JelloExtractor)
assert type(stream) == DeclarativeStream
assert stream.primary_key == "id"
assert stream.name == "lists"
assert type(stream._schema_loader) == JsonSchema
assert type(stream._retriever) == SimpleRetriever
assert stream._retriever._requester._method == HttpMethod.GET
assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"]
assert type(stream._retriever._record_selector) == RecordSelector
assert type(stream._retriever._record_selector._extractor._decoder) == JsonDecoder
assert stream._retriever._record_selector._extractor._transform == ".result[]"
assert type(stream._retriever._record_selector._record_filter) == RecordFilter
assert stream._retriever._record_selector._record_filter._filter_interpolator._condition == "{{ record['id'] > stream_state['id'] }}"
assert stream._schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.json"
checker = factory.create_component(config["check"], input_config)()
streams_to_check = checker._stream_names
assert len(streams_to_check) == 1
assert list(streams_to_check)[0] == "list_stream"
assert stream._retriever._requester._path._default == "marketing/lists"
def test_create_record_selector():
content = """
extractor:
type: JelloExtractor
transform: "_"
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
record_filter:
class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter
condition: "{{ record['id'] > stream_state['id'] }}"
extractor:
ref: "*ref(extractor)"
transform: "_"
"""
config = parser.parse(content)
selector = factory.create_component(config["selector"], input_config)()
assert isinstance(selector, RecordSelector)
assert isinstance(selector._extractor, JelloExtractor)
assert selector._extractor._transform == "_"
assert isinstance(selector._record_filter, RecordFilter)
def test_create_requester():
content = """
requester:
type: HttpRequester
path: "/v3/marketing/lists"
name: lists
url_base: "https://api.sendgrid.com"
authenticator:
type: "TokenAuthenticator"
token: "{{ config.apikey }}"
request_options_provider:
request_parameters:
page_size: 10
request_headers:
header: header_value
"""
config = parser.parse(content)
component = factory.create_component(config["requester"], input_config)()
assert isinstance(component, HttpRequester)
assert isinstance(component._error_handler, DefaultErrorHandler)
assert component._path._string == "/v3/marketing/lists"
assert component._url_base._string == "https://api.sendgrid.com"
assert isinstance(component._authenticator, TokenAuthenticator)
assert component._method == HttpMethod.GET
assert component._request_options_provider._parameter_interpolator._interpolator._mapping["page_size"] == 10
assert component._request_options_provider._headers_interpolator._interpolator._mapping["header"] == "header_value"
assert component._name == "lists"
def test_create_composite_error_handler():
content = """
error_handler:
type: "CompositeErrorHandler"
error_handlers:
- response_filters:
- predicate: "{{ 'code' in response }}"
action: RETRY
- response_filters:
- http_codes: [ 403 ]
action: RETRY
"""
config = parser.parse(content)
component = factory.create_component(config["error_handler"], input_config)()
assert len(component._error_handlers) == 2
assert isinstance(component._error_handlers[0], DefaultErrorHandler)
assert isinstance(component._error_handlers[0]._response_filters[0], HttpResponseFilter)
assert component._error_handlers[0]._response_filters[0]._predicate._condition == "{{ 'code' in response }}"
assert component._error_handlers[1]._response_filters[0]._http_codes == [403]
assert isinstance(component, CompositeErrorHandler)
def test_config_with_defaults():
content = """
lists_stream:
type: "DeclarativeStream"
options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader:
file_path: "./source_sendgrid/schemas/{{name}}.yaml"
retriever:
paginator:
type: "LimitPaginator"
page_size: 10
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "TokenAuthenticator"
token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
transform: ".result[]"
streams:
- "*ref(lists_stream)"
"""
config = parser.parse(content)
stream_config = config["lists_stream"]
stream = factory.create_component(stream_config, input_config)()
assert type(stream) == DeclarativeStream
assert stream.primary_key == "id"
assert stream.name == "lists"
assert type(stream._schema_loader) == JsonSchema
assert type(stream._retriever) == SimpleRetriever
assert stream._retriever._requester._method == HttpMethod.GET
assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"]
assert stream._retriever._record_selector._extractor._transform == ".result[]"
assert stream._schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.yaml"
assert isinstance(stream._retriever._paginator, LimitPaginator)
assert stream._retriever._paginator._url_base._string == "https://api.sendgrid.com"
assert stream._retriever._paginator._page_size == 10
def test_create_limit_paginator():
content = """
paginator:
type: "LimitPaginator"
page_size: 10
url_base: "https://airbyte.io"
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
"""
config = parser.parse(content)
paginator_config = config["paginator"]
paginator = factory.create_component(paginator_config, input_config)()
assert isinstance(paginator, LimitPaginator)
page_token_option = paginator._page_token_option
assert isinstance(page_token_option, RequestOption)
assert page_token_option.inject_into == RequestOptionType.path
class TestCreateTransformations:
# the tabbing matters
base_options = """
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader:
file_path: "./source_sendgrid/schemas/{{options.name}}.yaml"
retriever:
requester:
path: "/v3/marketing/lists"
request_parameters:
page_size: 10
record_selector:
extractor:
transform: ".result[]"
"""
def test_no_transformations(self):
content = f"""
the_stream:
type: DeclarativeStream
options:
{self.base_options}
"""
config = parser.parse(content)
component = factory.create_component(config["the_stream"], input_config)()
assert isinstance(component, DeclarativeStream)
assert [] == component._transformations
def test_remove_fields(self):
content = f"""
the_stream:
type: DeclarativeStream
options:
{self.base_options}
transformations:
- type: RemoveFields
field_pointers:
- ["path", "to", "field1"]
- ["path2"]
"""
config = parser.parse(content)
component = factory.create_component(config["the_stream"], input_config)()
assert isinstance(component, DeclarativeStream)
expected = [RemoveFields(field_pointers=[["path", "to", "field1"], ["path2"]])]
assert expected == component._transformations
def test_add_fields(self):
content = f"""
the_stream:
class_name: airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream
options:
{self.base_options}
transformations:
- type: AddFields
fields:
- path: ["field1"]
value: "static_value"
"""
config = parser.parse(content)
component = factory.create_component(config["the_stream"], input_config)()
assert isinstance(component, DeclarativeStream)
expected = [AddFields([AddedFieldDefinition(["field1"], "static_value")])]
assert expected == component._transformations