1
0
mirror of synced 2026-01-06 15:03:36 -05:00
Files
airbyte/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py
Alexandre Girard aa92518721 Low-code connectors: configurable source from yaml (#13038)
* checkout from alex/cac

* checkout from alex/cac

* checkout from alex/cac

* checkout from alex/cac

* Add missing tests

* Add missing files

* Add missing tests

* add missing file

* missing file

* missing file

* sengrid low code connector

* rename

* doc

* doc

* remove broken test

* rename

* jinja dependency

* Add comment

* comment

* comment

* pyjq dependency

* update import

* rename file

* delete unused file

* Revert "delete unused file"

This reverts commit 758e939367.

* fix

* rename

* abstract property

* delete unused field

* delete unused field

* rename

* pass kwargs directly

* isort

* Revert "isort"

This reverts commit 4a79223944.

* isort

* update state

* fix imports

* update

* update dependency

* remove dead code

* remove dead code

* format

* rename file

* decoder

* Use decoder

* Update comment

* dict_state is actually backed by a dict

* Add a comment

* update state takes kwargs

* move state out of offset paginator

* fix

* update jq parameter order

* fix

* pass config

* update

* update

* remove incremental mixin

* delete comment

* start workin on yaml parser

* fix test

* progress

* refer and overwrite partials

* factory tests pass

* fix

* reset

* Assert http_method is an enum value

* fix auth

* read lists works

* fix test

* comment

* implement all streams

* build connection checker

* update comments

* update comments

* remove no_state

* rename package

* checkout from alex/cac

* Add missing tests

* Add missing files

* missing file

* rename

* jinja dependency

* Add comment

* comment

* comment

* Revert "delete unused file"

This reverts commit 758e939367.

* delete unused field

* delete unused field

* rename

* pass kwargs directly

* isort

* Revert "isort"

This reverts commit 4a79223944.

* format

* decoder

* better error handling

* remove nostate

* isort

* remove print

* move test

* delete duplicates

* update

* delete dead code

* Update mapping type to [str, Any]

* add comment

* Add comment

* pass parameters through kwargs

* pass parameters through kwargs

* fix test

* update interface

* update interface to pass source in interface

* update interface to pass source in interface

* rename to stream_slicer

* Allow passing a string or an enum

* Define StateType enum

* unit tests pass

* update dict state

* update

* can read

* fix test

* fix from yaml update

* elif

* convert state_type if not of type type

* convert state_type if not of type type

* Add a test

* Low code connectors: string interpolation with jinja (#12852)

* checkout from alex/cac

* Add missing tests

* Add missing files

* missing file

* rename

* jinja dependency

* Add comment

* comment

* comment

* Revert "delete unused file"

This reverts commit 758e939367.

* delete unused field

* delete unused field

* rename

* pass kwargs directly

* isort

* Revert "isort"

This reverts commit 4a79223944.

* format

* decoder

* better error handling

* remove nostate

* isort

* delete dead code

* Update mapping type to [str, Any]

* add comment

* Add comment

* pass parameters through kwargs

* move test to right module

* Add missing test

* Use authbase instead of deprecated class

* leverage generator

* remove sendgrid specific code

* update

* update

* delete comment

* remove sendgrid specific file

* remove unused file

* Delete dead code

* rename methods

* rename to declarative

* rename the classes too

* select streams to check

* nit

* rename method

* rename class

* {} is faster than dict()

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* more precise exception

* rename class

* add comment

* Try to install packages to build jq

* isort

* only automake

* Revert "only automake"

This reverts commit c8fe154ffc.

* remove git

* rename file

* create components in kwargs

* Use tuple of strings

* parser doesn't need to be stored

* move file and delete duplicates

* Revert "Use tuple of strings"

This reverts commit ab5a7afd08.

* raise error if streams to check are not in the catalog

* Revert "Revert "Use tuple of strings""

This reverts commit 7c9fb8eb33.

* traverse tree

* rename to options

* move docstring

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* fix tests and format

* format

* update

* better error message

* Add jq dependency

* Use request header provider

* rename

* rename field

* remove get_context method

* rename

* add a comment

* format

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
2022-06-01 07:31:52 -07:00

136 lines
5.8 KiB
Python

#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.declarative.requesters.request_params.interpolated_request_parameter_provider import (
InterpolatedRequestParameterProvider,
)
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
factory = DeclarativeComponentFactory()
parser = YamlParser()
input_config = {"apikey": "verysecrettoken"}
def test_factory():
content = """
limit: 50
offset_request_parameters:
offset: "{{ next_page_token['offset'] }}"
limit: "*ref(limit)"
offset_pagination_request_parameters:
class_name: airbyte_cdk.sources.declarative.requesters.request_params.interpolated_request_parameter_provider.InterpolatedRequestParameterProvider
request_parameters: "*ref(offset_request_parameters)"
"""
config = parser.parse(content)
offset_pagination_request_parameters = factory.create_component(config["offset_pagination_request_parameters"], input_config)()
assert type(offset_pagination_request_parameters) == InterpolatedRequestParameterProvider
assert offset_pagination_request_parameters._interpolator._config == input_config
assert offset_pagination_request_parameters._interpolator._interpolator._mapping["offset"] == "{{ next_page_token['offset'] }}"
def test_interpolate_config():
content = """
authenticator:
class_name: airbyte_cdk.sources.streams.http.requests_native_auth.token.TokenAuthenticator
token: "{{ config['apikey'] }}"
"""
config = parser.parse(content)
authenticator = factory.create_component(config["authenticator"], input_config)()
assert authenticator._tokens == ["verysecrettoken"]
def test_full_config():
content = """
decoder:
class_name: "airbyte_cdk.sources.declarative.decoders.json_decoder.JsonDecoder"
extractor:
class_name: airbyte_cdk.sources.declarative.extractors.jq.JqExtractor
decoder: "*ref(decoder)"
metadata_paginator:
class_name: "airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator.NextPageUrlPaginator"
next_page_token_template:
"next_page_url": "{{ decoded_response['_metadata']['next'] }}"
next_page_url_from_token_partial:
class_name: "airbyte_cdk.sources.declarative.interpolation.interpolated_string.InterpolatedString"
string: "{{ next_page_token['next_page_url'] }}"
request_parameters_provider:
class_name: airbyte_cdk.sources.declarative.requesters.request_params.interpolated_request_parameter_provider.InterpolatedRequestParameterProvider
requester:
class_name: airbyte_cdk.sources.declarative.requesters.http_requester.HttpRequester
name: "{{ options['name'] }}"
url_base: "https://api.sendgrid.com/v3/"
http_method: "GET"
authenticator:
class_name: airbyte_cdk.sources.streams.http.requests_native_auth.token.TokenAuthenticator
token: "{{ config['apikey'] }}"
request_parameters_provider: "*ref(request_parameters_provider)"
retrier:
class_name: airbyte_cdk.sources.declarative.requesters.retriers.default_retrier.DefaultRetrier
retriever:
class_name: "airbyte_cdk.sources.declarative.retrievers.simple_retriever.SimpleRetriever"
name: "{{ options['name'] }}"
state:
class_name: airbyte_cdk.sources.declarative.states.dict_state.DictState
stream_slicer:
class_name: airbyte_cdk.sources.declarative.stream_slicers.single_slice.SingleSlice
paginator:
class_name: airbyte_cdk.sources.declarative.requesters.paginators.no_pagination.NoPagination
primary_key: "{{ options['primary_key'] }}"
partial_stream:
class_name: "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
schema_loader:
class_name: airbyte_cdk.sources.declarative.schema.json_schema.JsonSchema
file_path: "./source_sendgrid/schemas/{{options['name']}}.json"
cursor_field: [ ]
list_stream:
ref: "*ref(partial_stream)"
options:
name: "lists"
primary_key: "id"
extractor:
ref: "*ref(extractor)"
transform: ".result[]"
retriever:
ref: "*ref(retriever)"
requester:
ref: "*ref(requester)"
path:
ref: "*ref(next_page_url_from_token_partial)"
default: "marketing/lists"
paginator:
ref: "*ref(metadata_paginator)"
check:
class_name: airbyte_cdk.sources.declarative.checks.check_stream.CheckStream
stream_names: ["list_stream"]
"""
config = parser.parse(content)
stream_config = config["list_stream"]
assert stream_config["class_name"] == "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
assert stream_config["cursor_field"] == []
stream = factory.create_component(stream_config, input_config)()
assert type(stream) == DeclarativeStream
assert stream.primary_key == "id"
assert stream.name == "lists"
assert type(stream._schema_loader) == JsonSchema
assert type(stream._retriever) == SimpleRetriever
assert stream._retriever._requester._method == HttpMethod.GET
assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"]
assert type(stream._retriever._extractor._decoder) == JsonDecoder
assert stream._retriever._extractor._transform == ".result[]"
assert stream._schema_loader._file_path._string == "./source_sendgrid/schemas/lists.json"
checker = factory.create_component(config["check"], input_config)()
streams_to_check = checker._stream_names
assert len(streams_to_check) == 1
assert list(streams_to_check)[0] == "list_stream"