* abstract auth token
* basichttp
* remove prints
* docstrings
* get rid of parse-time interpolation
* always pass options through
* delete print
* delete misleading comment
* delete note
* reset
* pass down options
* delete duplicate file
* missing test
* refactor test
* rename to '$options'
* rename to ''
* interpolatedauth
* fix tests
* fix
* docstrings
* update docstring
* docstring
* update docstring
* remove extra field
* undo
* rename to runtime_parameters
* docstring
* update
* / -> *
* update template
* rename to options
* Add examples
* update docstring
* Update test
* newlines
* rename kwargs to options
* options init param
* delete duplicate line
* type hints
* update docstring
* Revert "delete duplicate line"
This reverts commit 4255d5b346.
* delete duplicate code from bad merge
* rename file
* bump cdk version
54 lines
1.5 KiB
Python
54 lines
1.5 KiB
Python
#
|
|
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from airbyte_cdk.sources.declarative.create_partial import create
|
|
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
|
|
|
|
|
|
class AClass:
|
|
def __init__(self, parameter, another_param):
|
|
self.parameter = parameter
|
|
self.another_param = another_param
|
|
|
|
|
|
class OuterClass:
|
|
def __init__(self, name, some_field, inner_param):
|
|
self.name = name
|
|
self.some_field = some_field
|
|
self.inner_param = inner_param
|
|
|
|
|
|
class OuterOuterClass:
|
|
def __init__(self, name, param, inner_class):
|
|
self.name = name
|
|
self.param = param
|
|
self.inner_class = inner_class
|
|
|
|
|
|
def test_pass_parameter_to_create_function():
|
|
object = create(AClass, parameter="A")(another_param="B")
|
|
assert object.parameter == "A"
|
|
assert object.another_param == "B"
|
|
|
|
|
|
def test_overwrite_param():
|
|
object = create(AClass, parameter="A", another_param="B")(parameter="C")
|
|
assert object.parameter == "C"
|
|
assert object.another_param == "B"
|
|
|
|
|
|
def test_string_interpolation():
|
|
s = "{{ next_page_token['next_page_url'] }}"
|
|
partial = create(InterpolatedString, string=s)
|
|
interpolated_string = partial()
|
|
assert interpolated_string._string == s
|
|
|
|
|
|
def test_string_interpolation_through_kwargs():
|
|
s = "{{ options['name'] }}"
|
|
options = {"name": "airbyte"}
|
|
partial = create(InterpolatedString, string=s, options=options)
|
|
interpolated_string = partial()
|
|
assert interpolated_string.eval({}) == "airbyte"
|