1
0
mirror of synced 2025-12-23 21:03:15 -05:00

Airbyte CDK: add interpolation for request options (#35485)

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
This commit is contained in:
Artem Inzhyyants
2024-02-22 19:40:44 +01:00
committed by GitHub
parent 2d80b5676d
commit 0954ad3d3a
40 changed files with 1374 additions and 1015 deletions

View File

@@ -1898,6 +1898,9 @@ definitions:
type: string
examples:
- segment_id
interpolation_context:
- config
- parameters
inject_into:
title: Inject Into
description: Configures where the descriptor should be set on the HTTP requests. Note that request parameters that are already encoded in the URL path will not be duplicated.
@@ -2154,6 +2157,13 @@ interpolation:
examples:
- start_date: 2010-01-01
api_key: "*****"
- title: parameters
description: Additional runtime parameters, to be used for string interpolation. Parameters can be passed down from a parent component to its subcomponents using the $parameters key. This can be used to avoid repetitions.
type: object
examples:
- path: "automations"
data_export_path: "automations"
cursor_field: "updated_at"
- title: headers
description: The HTTP headers from the last response received from the API. The object's keys are the header names from the response.
type: object

View File

@@ -247,9 +247,11 @@ class DatetimeBasedCursor(Cursor):
def _get_request_options(self, option_type: RequestOptionType, stream_slice: StreamSlice):
options = {}
if self.start_time_option and self.start_time_option.inject_into == option_type:
options[self.start_time_option.field_name] = stream_slice.get(self.partition_field_start.eval(self.config))
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get(
self.partition_field_start.eval(self.config)
)
if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name] = stream_slice.get(self.partition_field_end.eval(self.config))
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self.partition_field_end.eval(self.config))
return options
def should_be_synced(self, record: Record) -> bool:

View File

@@ -81,7 +81,7 @@ class ListPartitionRouter(StreamSlicer):
if self.request_option and self.request_option.inject_into == request_option_type and stream_slice:
slice_value = stream_slice.get(self.cursor_field.eval(self.config))
if slice_value:
return {self.request_option.field_name: slice_value}
return {self.request_option.field_name.eval(self.config): slice_value}
else:
return {}
else:

View File

@@ -100,7 +100,7 @@ class SubstreamPartitionRouter(StreamSlicer):
key = parent_config.partition_field.eval(self.config)
value = stream_slice.get(key)
if value:
params.update({parent_config.request_option.field_name: value})
params.update({parent_config.request_option.field_name.eval(config=self.config): value})
return params
def stream_slices(self) -> Iterable[StreamSlice]:

View File

@@ -164,9 +164,9 @@ class DefaultPaginator(Paginator):
and isinstance(self.page_token_option, RequestOption)
and self.page_token_option.inject_into == option_type
):
options[self.page_token_option.field_name] = self._token
options[self.page_token_option.field_name.eval(config=self.config)] = self._token
if self.page_size_option and self.pagination_strategy.get_page_size() and self.page_size_option.inject_into == option_type:
options[self.page_size_option.field_name] = self.pagination_strategy.get_page_size()
options[self.page_size_option.field_name.eval(config=self.config)] = self.pagination_strategy.get_page_size()
return options

View File

@@ -4,7 +4,9 @@
from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Mapping
from typing import Any, Mapping, Union
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
class RequestOptionType(Enum):
@@ -28,6 +30,9 @@ class RequestOption:
inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
"""
field_name: str
field_name: Union[InterpolatedString, str]
inject_into: RequestOptionType
parameters: InitVar[Mapping[str, Any]]
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.field_name = InterpolatedString.create(self.field_name, parameters=parameters)

View File

@@ -1,7 +1,4 @@
from .abstract_file_based_availability_strategy import (
AbstractFileBasedAvailabilityStrategy,
AbstractFileBasedAvailabilityStrategyWrapper,
)
from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy, AbstractFileBasedAvailabilityStrategyWrapper
from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
__all__ = ["AbstractFileBasedAvailabilityStrategy", "AbstractFileBasedAvailabilityStrategyWrapper", "DefaultFileBasedAvailabilityStrategy"]

View File

@@ -1,5 +1,5 @@
from .abstract_concurrent_file_based_cursor import AbstractConcurrentFileBasedCursor
from .file_based_noop_cursor import FileBasedNoopCursor
from .file_based_concurrent_cursor import FileBasedConcurrentCursor
from .file_based_noop_cursor import FileBasedNoopCursor
__all__ = ["AbstractConcurrentFileBasedCursor", "FileBasedConcurrentCursor", "FileBasedNoopCursor"]

View File

@@ -285,7 +285,7 @@ def test_process_multiple_chunks_with_relevant_fields():
"text: Special tokens like",
"<|endoftext|> are treated like regular",
"text",
]
],
),
(
"Custom separator",

View File

@@ -118,8 +118,6 @@ def test_openai_chunking():
mock_embedding_instance.embed_documents.side_effect = lambda texts: [[0] * OPEN_AI_VECTOR_SIZE] * len(texts)
chunks = [
Document(page_content="a", record=AirbyteRecordMessage(stream="mystream", data={}, emitted_at=0)) for _ in range(1005)
]
chunks = [Document(page_content="a", record=AirbyteRecordMessage(stream="mystream", data={}, emitted_at=0)) for _ in range(1005)]
assert embedder.embed_documents(chunks) == [[0] * OPEN_AI_VECTOR_SIZE] * 1005
mock_embedding_instance.embed_documents.assert_has_calls([call(["a"] * 1000), call(["a"] * 5)])

View File

@@ -83,7 +83,7 @@ def test_api_key_authenticator(test_name, header, token, expected_header, expect
"""
token_provider = InterpolatedStringTokenProvider(config=config, api_token=token, parameters=parameters)
token_auth = ApiKeyAuthenticator(
request_option=RequestOption(inject_into=RequestOptionType.header, field_name=header, parameters={}),
request_option=RequestOption(inject_into=RequestOptionType.header, field_name=header, parameters=parameters),
token_provider=token_provider,
config=config,
parameters=parameters,
@@ -192,7 +192,7 @@ def test_api_key_authenticator_inject(test_name, field_name, token, expected_fie
"""
token_provider = InterpolatedStringTokenProvider(config=config, api_token=token, parameters=parameters)
token_auth = ApiKeyAuthenticator(
request_option=RequestOption(inject_into=inject_type, field_name=field_name, parameters={}),
request_option=RequestOption(inject_into=inject_type, field_name=field_name, parameters=parameters),
token_provider=token_provider,
config=config,
parameters=parameters,

View File

@@ -232,7 +232,7 @@ spec:
assert isinstance(stream.retriever.paginator, DefaultPaginator)
assert isinstance(stream.retriever.paginator.decoder, JsonDecoder)
assert stream.retriever.paginator.page_size_option.field_name == "page_size"
assert stream.retriever.paginator.page_size_option.field_name.eval(input_config) == "page_size"
assert stream.retriever.paginator.page_size_option.inject_into == RequestOptionType.request_parameter
assert isinstance(stream.retriever.paginator.page_token_option, RequestPath)
assert stream.retriever.paginator.url_base.string == "https://api.sendgrid.com/v3/"
@@ -422,7 +422,7 @@ def test_list_based_stream_slicer_with_values_defined_in_config():
assert isinstance(partition_router, ListPartitionRouter)
assert partition_router.values == ["airbyte", "airbyte-cloud"]
assert partition_router.request_option.inject_into == RequestOptionType.header
assert partition_router.request_option.field_name == "repository"
assert partition_router.request_option.field_name.eval(config=input_config) == "repository"
def test_create_substream_partition_router():
@@ -484,7 +484,7 @@ def test_create_substream_partition_router():
assert partition_router.parent_stream_configs[0].parent_key.eval({}) == "id"
assert partition_router.parent_stream_configs[0].partition_field.eval({}) == "repository_id"
assert partition_router.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
assert partition_router.parent_stream_configs[0].request_option.field_name == "repository_id"
assert partition_router.parent_stream_configs[0].request_option.field_name.eval(config=input_config) == "repository_id"
assert partition_router.parent_stream_configs[1].parent_key.eval({}) == "someid"
assert partition_router.parent_stream_configs[1].partition_field.eval({}) == "word_id"
@@ -509,17 +509,17 @@ def test_datetime_based_cursor():
start_time_option:
type: RequestOption
inject_into: request_parameter
field_name: created[gte]
field_name: "since_{{ config['cursor_field'] }}"
end_time_option:
type: RequestOption
inject_into: body_json
field_name: end_time
field_name: "before_{{ parameters['cursor_field'] }}"
partition_field_start: star
partition_field_end: en
"""
parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
slicer_manifest = transformer.propagate_types_and_parameters("", resolved_manifest["incremental"], {})
slicer_manifest = transformer.propagate_types_and_parameters("", resolved_manifest["incremental"], {"cursor_field": "created_at"})
stream_slicer = factory.create_component(model_type=DatetimeBasedCursorModel, component_definition=slicer_manifest, config=input_config)
@@ -529,9 +529,9 @@ def test_datetime_based_cursor():
assert stream_slicer.cursor_granularity == "PT0.000001S"
assert stream_slicer.lookback_window.string == "P5D"
assert stream_slicer.start_time_option.inject_into == RequestOptionType.request_parameter
assert stream_slicer.start_time_option.field_name == "created[gte]"
assert stream_slicer.start_time_option.field_name.eval(config=input_config | {"cursor_field": "updated_at"}) == "since_updated_at"
assert stream_slicer.end_time_option.inject_into == RequestOptionType.body_json
assert stream_slicer.end_time_option.field_name == "end_time"
assert stream_slicer.end_time_option.field_name.eval({}) == "before_created_at"
assert stream_slicer.partition_field_start.eval({}) == "star"
assert stream_slicer.partition_field_end.eval({}) == "en"
@@ -937,18 +937,20 @@ requester:
}
@pytest.mark.parametrize("input_config, expected_authenticator_class", [
pytest.param(
{"auth": {"type": "token"}, "credentials": {"api_key": "some_key"}},
ApiKeyAuthenticator,
id="test_create_requester_with_selective_authenticator_and_token_selected",
),
pytest.param(
{"auth": {"type": "oauth"}, "credentials": {"client_id": "ABC"}},
DeclarativeOauth2Authenticator,
id="test_create_requester_with_selective_authenticator_and_oauth_selected",
),
]
@pytest.mark.parametrize(
"input_config, expected_authenticator_class",
[
pytest.param(
{"auth": {"type": "token"}, "credentials": {"api_key": "some_key"}},
ApiKeyAuthenticator,
id="test_create_requester_with_selective_authenticator_and_token_selected",
),
pytest.param(
{"auth": {"type": "oauth"}, "credentials": {"client_id": "ABC"}},
DeclarativeOauth2Authenticator,
id="test_create_requester_with_selective_authenticator_and_oauth_selected",
),
],
)
def test_create_requester_with_selective_authenticator(input_config, expected_authenticator_class):
content = """
@@ -1121,7 +1123,7 @@ def test_create_default_paginator():
assert isinstance(paginator.page_size_option, RequestOption)
assert paginator.page_size_option.inject_into == RequestOptionType.request_parameter
assert paginator.page_size_option.field_name == "page_size"
assert paginator.page_size_option.field_name.eval(config=input_config) == "page_size"
assert isinstance(paginator.page_token_option, RequestPath)
@@ -1294,7 +1296,7 @@ def test_custom_components_do_not_contain_extra_fields():
assert custom_substream_partition_router.parent_stream_configs[0].parent_key.eval({}) == "id"
assert custom_substream_partition_router.parent_stream_configs[0].partition_field.eval({}) == "repository_id"
assert custom_substream_partition_router.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
assert custom_substream_partition_router.parent_stream_configs[0].request_option.field_name == "repository_id"
assert custom_substream_partition_router.parent_stream_configs[0].request_option.field_name.eval(config=input_config) == "repository_id"
assert isinstance(custom_substream_partition_router.custom_pagination_strategy, PageIncrement)
assert custom_substream_partition_router.custom_pagination_strategy.page_size == 100
@@ -1343,7 +1345,7 @@ def test_parse_custom_component_fields_if_subcomponent():
assert custom_substream_partition_router.parent_stream_configs[0].parent_key.eval({}) == "id"
assert custom_substream_partition_router.parent_stream_configs[0].partition_field.eval({}) == "repository_id"
assert custom_substream_partition_router.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
assert custom_substream_partition_router.parent_stream_configs[0].request_option.field_name == "repository_id"
assert custom_substream_partition_router.parent_stream_configs[0].request_option.field_name.eval(config=input_config) == "repository_id"
assert isinstance(custom_substream_partition_router.custom_pagination_strategy, PageIncrement)
assert custom_substream_partition_router.custom_pagination_strategy.page_size == 100

View File

@@ -12,39 +12,40 @@ parameters = {"cursor_field": "owner_resource"}
@pytest.mark.parametrize(
"test_name, partition_values, cursor_field, expected_slices",
"partition_values, cursor_field, expected_slices",
[
(
"test_single_element",
["customer", "store", "subscription"],
"owner_resource",
[{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}],
),
(
"test_input_list_is_string",
'["customer", "store", "subscription"]',
"owner_resource",
[{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}],
),
(
"test_using_cursor_from_parameters",
'["customer", "store", "subscription"]',
"{{ parameters['cursor_field'] }}",
[{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}],
),
],
ids=[
"test_single_element",
"test_input_list_is_string",
"test_using_cursor_from_parameters",
],
)
def test_list_partition_router(test_name, partition_values, cursor_field, expected_slices):
def test_list_partition_router(partition_values, cursor_field, expected_slices):
slicer = ListPartitionRouter(values=partition_values, cursor_field=cursor_field, config={}, parameters=parameters)
slices = [s for s in slicer.stream_slices()]
assert slices == expected_slices
@pytest.mark.parametrize(
"test_name, request_option, expected_req_params, expected_headers, expected_body_json, expected_body_data",
"request_option, expected_req_params, expected_headers, expected_body_json, expected_body_data",
[
(
"test_inject_into_req_param",
RequestOption(inject_into=RequestOptionType.request_parameter, parameters={}, field_name="owner_resource"),
{"owner_resource": "customer"},
{},
@@ -52,7 +53,6 @@ def test_list_partition_router(test_name, partition_values, cursor_field, expect
{},
),
(
"test_pass_by_header",
RequestOption(inject_into=RequestOptionType.header, parameters={}, field_name="owner_resource"),
{},
{"owner_resource": "customer"},
@@ -60,7 +60,6 @@ def test_list_partition_router(test_name, partition_values, cursor_field, expect
{},
),
(
"test_inject_into_body_json",
RequestOption(inject_into=RequestOptionType.body_json, parameters={}, field_name="owner_resource"),
{},
{},
@@ -68,7 +67,6 @@ def test_list_partition_router(test_name, partition_values, cursor_field, expect
{},
),
(
"test_inject_into_body_data",
RequestOption(inject_into=RequestOptionType.body_data, parameters={}, field_name="owner_resource"),
{},
{},
@@ -76,8 +74,14 @@ def test_list_partition_router(test_name, partition_values, cursor_field, expect
{"owner_resource": "customer"},
),
],
ids=[
"test_inject_into_req_param",
"test_pass_by_header",
"test_inject_into_body_json",
"test_inject_into_body_data",
],
)
def test_request_option(test_name, request_option, expected_req_params, expected_headers, expected_body_json, expected_body_data):
def test_request_option(request_option, expected_req_params, expected_headers, expected_body_json, expected_body_data):
partition_router = ListPartitionRouter(
values=partition_values, cursor_field=cursor_field, config={}, request_option=request_option, parameters={}
)
@@ -89,6 +93,31 @@ def test_request_option(test_name, request_option, expected_req_params, expected
assert expected_body_data == partition_router.get_request_body_data(stream_slice=stream_slice)
@pytest.mark.parametrize(
"field_name_interpolation, expected_request_params",
[
("{{parameters['partition_name']}}", {"parameters_partition": "customer"}),
("{{config['partition_name']}}", {"config_partition": "customer"}),
],
ids=[
"parameters_interpolation",
"config_interpolation",
],
)
def test_request_options_interpolation(field_name_interpolation: str, expected_request_params: dict):
config = {"partition_name": "config_partition"}
parameters = {"partition_name": "parameters_partition"}
request_option = RequestOption(
inject_into=RequestOptionType.request_parameter, parameters=parameters, field_name=field_name_interpolation
)
partition_router = ListPartitionRouter(
values=partition_values, cursor_field=cursor_field, config=config, request_option=request_option, parameters=parameters
)
stream_slice = {cursor_field: "customer"}
assert expected_request_params == partition_router.get_request_params(stream_slice=stream_slice)
def test_request_option_before_updating_cursor():
request_option = RequestOption(inject_into=RequestOptionType.request_parameter, parameters={}, field_name="owner_resource")
partition_router = ListPartitionRouter(

View File

@@ -57,11 +57,10 @@ class MockStream(Stream):
@pytest.mark.parametrize(
"test_name, parent_stream_configs, expected_slices",
"parent_stream_configs, expected_slices",
[
("test_no_parents", [], None),
([], None),
(
"test_single_parent_slices_no_records",
[
ParentStreamConfig(
stream=MockStream([{}], [], "first_stream"),
@@ -74,7 +73,6 @@ class MockStream(Stream):
[],
),
(
"test_single_parent_slices_with_records",
[
ParentStreamConfig(
stream=MockStream([{}], parent_records, "first_stream"),
@@ -87,7 +85,6 @@ class MockStream(Stream):
[{"first_stream_id": 1, "parent_slice": {}}, {"first_stream_id": 2, "parent_slice": {}}],
),
(
"test_with_parent_slices_and_records",
[
ParentStreamConfig(
stream=MockStream(parent_slices, all_parent_data, "first_stream"),
@@ -104,7 +101,6 @@ class MockStream(Stream):
],
),
(
"test_multiple_parent_streams",
[
ParentStreamConfig(
stream=MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"),
@@ -130,7 +126,6 @@ class MockStream(Stream):
],
),
(
"test_missed_parent_key",
[
ParentStreamConfig(
stream=MockStream([{}], [{"id": 0}, {"id": 1}, {"_id": 2}, {"id": 3}], "first_stream"),
@@ -147,7 +142,6 @@ class MockStream(Stream):
],
),
(
"test_dpath_extraction",
[
ParentStreamConfig(
stream=MockStream([{}], [{"a": {"b": 0}}, {"a": {"b": 1}}, {"a": {"c": 2}}, {"a": {"b": 3}}], "first_stream"),
@@ -164,8 +158,17 @@ class MockStream(Stream):
],
),
],
ids=[
"test_no_parents",
"test_single_parent_slices_no_records",
"test_single_parent_slices_with_records",
"test_with_parent_slices_and_records",
"test_multiple_parent_streams",
"test_missed_parent_key",
"test_dpath_extraction",
],
)
def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
def test_substream_slicer(parent_stream_configs, expected_slices):
if expected_slices is None:
try:
SubstreamPartitionRouter(parent_stream_configs=parent_stream_configs, parameters={}, config={})
@@ -178,10 +181,9 @@ def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
@pytest.mark.parametrize(
"test_name, parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data",
"parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data",
[
(
"test_request_option_in_request_param",
[
RequestOption(inject_into=RequestOptionType.request_parameter, parameters={}, field_name="first_stream"),
RequestOption(inject_into=RequestOptionType.request_parameter, parameters={}, field_name="second_stream"),
@@ -192,7 +194,6 @@ def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
{},
),
(
"test_request_option_in_header",
[
RequestOption(inject_into=RequestOptionType.header, parameters={}, field_name="first_stream"),
RequestOption(inject_into=RequestOptionType.header, parameters={}, field_name="second_stream"),
@@ -203,7 +204,6 @@ def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
{},
),
(
"test_request_option_in_param_and_header",
[
RequestOption(inject_into=RequestOptionType.request_parameter, parameters={}, field_name="first_stream"),
RequestOption(inject_into=RequestOptionType.header, parameters={}, field_name="second_stream"),
@@ -214,7 +214,6 @@ def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
{},
),
(
"test_request_option_in_body_json",
[
RequestOption(inject_into=RequestOptionType.body_json, parameters={}, field_name="first_stream"),
RequestOption(inject_into=RequestOptionType.body_json, parameters={}, field_name="second_stream"),
@@ -225,7 +224,6 @@ def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
{},
),
(
"test_request_option_in_body_data",
[
RequestOption(inject_into=RequestOptionType.body_data, parameters={}, field_name="first_stream"),
RequestOption(inject_into=RequestOptionType.body_data, parameters={}, field_name="second_stream"),
@@ -236,9 +234,15 @@ def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
{"first_stream": "1234", "second_stream": "4567"},
),
],
ids=[
"test_request_option_in_request_param",
"test_request_option_in_header",
"test_request_option_in_param_and_header",
"test_request_option_in_body_json",
"test_request_option_in_body_data",
],
)
def test_request_option(
test_name,
parent_stream_request_parameters,
expected_req_params,
expected_headers,
@@ -275,6 +279,61 @@ def test_request_option(
assert expected_body_data == partition_router.get_request_body_data(stream_slice=stream_slice)
@pytest.mark.parametrize(
"field_name_first_stream, field_name_second_stream, expected_request_params",
[
(
"{{parameters['field_name_first_stream']}}",
"{{parameters['field_name_second_stream']}}",
{"parameter_first_stream_id": "1234", "parameter_second_stream_id": "4567"},
),
(
"{{config['field_name_first_stream']}}",
"{{config['field_name_second_stream']}}",
{"config_first_stream_id": "1234", "config_second_stream_id": "4567"},
),
],
ids=[
"parameters_interpolation",
"config_interpolation",
],
)
def test_request_params_interpolation_for_parent_stream(
field_name_first_stream: str, field_name_second_stream: str, expected_request_params: dict
):
config = {"field_name_first_stream": "config_first_stream_id", "field_name_second_stream": "config_second_stream_id"}
parameters = {"field_name_first_stream": "parameter_first_stream_id", "field_name_second_stream": "parameter_second_stream_id"}
partition_router = SubstreamPartitionRouter(
parent_stream_configs=[
ParentStreamConfig(
stream=MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"),
parent_key="id",
partition_field="first_stream_id",
parameters=parameters,
config=config,
request_option=RequestOption(
inject_into=RequestOptionType.request_parameter, parameters=parameters, field_name=field_name_first_stream
),
),
ParentStreamConfig(
stream=MockStream(second_parent_stream_slice, more_records, "second_stream"),
parent_key="id",
partition_field="second_stream_id",
parameters=parameters,
config=config,
request_option=RequestOption(
inject_into=RequestOptionType.request_parameter, parameters=parameters, field_name=field_name_second_stream
),
),
],
parameters=parameters,
config=config,
)
stream_slice = {"first_stream_id": "1234", "second_stream_id": "4567"}
assert expected_request_params == partition_router.get_request_params(stream_slice=stream_slice)
def test_given_record_is_airbyte_message_when_stream_slices_then_use_record_data():
parent_slice = {}
partition_router = SubstreamPartitionRouter(

View File

@@ -21,10 +21,9 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
@pytest.mark.parametrize(
"test_name, page_token_request_option, stop_condition, expected_updated_path, expected_request_params, expected_headers, expected_body_data, expected_body_json, last_records, expected_next_page_token, limit",
"page_token_request_option, stop_condition, expected_updated_path, expected_request_params, expected_headers, expected_body_data, expected_body_json, last_records, expected_next_page_token, limit",
[
(
"test_default_paginator_path",
RequestPath(parameters={}),
None,
"/next_url",
@@ -37,7 +36,6 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
2,
),
(
"test_default_paginator_request_param",
RequestOption(inject_into=RequestOptionType.request_parameter, field_name="from", parameters={}),
None,
None,
@@ -50,7 +48,6 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
2,
),
(
"test_default_paginator_no_token",
RequestOption(inject_into=RequestOptionType.request_parameter, field_name="from", parameters={}),
InterpolatedBoolean(condition="{{True}}", parameters={}),
None,
@@ -63,7 +60,6 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
2,
),
(
"test_default_paginator_cursor_header",
RequestOption(inject_into=RequestOptionType.header, field_name="from", parameters={}),
None,
None,
@@ -76,7 +72,6 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
2,
),
(
"test_default_paginator_cursor_body_data",
RequestOption(inject_into=RequestOptionType.body_data, field_name="from", parameters={}),
None,
None,
@@ -89,7 +84,6 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
2,
),
(
"test_default_paginator_cursor_body_json",
RequestOption(inject_into=RequestOptionType.body_json, field_name="from", parameters={}),
None,
None,
@@ -102,9 +96,16 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
2,
),
],
ids=[
"test_default_paginator_path",
"test_default_paginator_request_param",
"test_default_paginator_no_token",
"test_default_paginator_cursor_header",
"test_default_paginator_cursor_body_data",
"test_default_paginator_cursor_body_json",
],
)
def test_default_paginator_with_cursor(
test_name,
page_token_request_option,
stop_condition,
expected_updated_path,
@@ -116,7 +117,9 @@ def test_default_paginator_with_cursor(
expected_next_page_token,
limit,
):
page_size_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={})
page_size_request_option = RequestOption(
inject_into=RequestOptionType.request_parameter, field_name="{{parameters['page_limit']}}", parameters={"page_limit": "limit"}
)
cursor_value = "{{ response.next }}"
url_base = "https://airbyte.io"
config = {}
@@ -157,6 +160,62 @@ def test_default_paginator_with_cursor(
assert actual_body_json == expected_body_json
@pytest.mark.parametrize(
"field_name_page_size_interpolation, field_name_page_token_interpolation, expected_request_params",
[
(
"{{parameters['page_limit']}}",
"{{parameters['page_token']}}",
{"parameters_limit": 50, "parameters_token": "https://airbyte.io/next_url"},
),
("{{config['page_limit']}}", "{{config['page_token']}}", {"config_limit": 50, "config_token": "https://airbyte.io/next_url"}),
],
ids=[
"parameters_interpolation",
"config_interpolation",
],
)
def test_paginator_request_param_interpolation(
field_name_page_size_interpolation: str, field_name_page_token_interpolation: str, expected_request_params: dict
):
config = {"page_limit": "config_limit", "page_token": "config_token"}
parameters = {"page_limit": "parameters_limit", "page_token": "parameters_token"}
page_size_request_option = RequestOption(
inject_into=RequestOptionType.request_parameter,
field_name=field_name_page_size_interpolation,
parameters=parameters,
)
cursor_value = "{{ response.next }}"
url_base = "https://airbyte.io"
limit = 50
strategy = CursorPaginationStrategy(
page_size=limit,
cursor_value=cursor_value,
stop_condition=None,
decoder=JsonDecoder(parameters={}),
config=config,
parameters=parameters,
)
paginator = DefaultPaginator(
page_size_option=page_size_request_option,
page_token_option=RequestOption(
inject_into=RequestOptionType.request_parameter, field_name=field_name_page_token_interpolation, parameters=parameters
),
pagination_strategy=strategy,
config=config,
url_base=url_base,
parameters=parameters,
)
response = requests.Response()
response.headers = {"A_HEADER": "HEADER_VALUE"}
response_body = {"next": "https://airbyte.io/next_url"}
response._content = json.dumps(response_body).encode("utf-8")
last_records = [{"id": 0}, {"id": 1}]
paginator.next_page_token(response, last_records)
actual_request_params = paginator.get_request_params()
assert actual_request_params == expected_request_params
def test_page_size_option_cannot_be_set_if_strategy_has_no_limit():
page_size_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="page_size", parameters={})
page_token_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="offset", parameters={})

View File

@@ -7,15 +7,37 @@ from airbyte_cdk.sources.declarative.requesters.request_option import RequestOpt
@pytest.mark.parametrize(
"test_name, option_type, field_name",
"option_type, field_name, expected_field_name",
[
("test_limit_param_with_field_name", RequestOptionType.request_parameter, "field"),
("test_limit_header_with_field_name", RequestOptionType.header, "field"),
("test_limit_data_with_field_name", RequestOptionType.body_data, "field"),
("test_limit_json_with_field_name", RequestOptionType.body_json, "field"),
(RequestOptionType.request_parameter, "field", "field"),
(RequestOptionType.header, "field", "field"),
(RequestOptionType.body_data, "field", "field"),
(RequestOptionType.body_json, "field", "field"),
(RequestOptionType.request_parameter, "since_{{ parameters['cursor_field'] }}", "since_updated_at"),
(RequestOptionType.header, "since_{{ parameters['cursor_field'] }}", "since_updated_at"),
(RequestOptionType.body_data, "since_{{ parameters['cursor_field'] }}", "since_updated_at"),
(RequestOptionType.body_json, "since_{{ parameters['cursor_field'] }}", "since_updated_at"),
(RequestOptionType.request_parameter, "since_{{ config['cursor_field'] }}", "since_created_at"),
(RequestOptionType.header, "since_{{ config['cursor_field'] }}", "since_created_at"),
(RequestOptionType.body_data, "since_{{ config['cursor_field'] }}", "since_created_at"),
(RequestOptionType.body_json, "since_{{ config['cursor_field'] }}", "since_created_at"),
],
ids=[
"test_limit_param_with_field_name",
"test_limit_header_with_field_name",
"test_limit_data_with_field_name",
"test_limit_json_with_field_name",
"test_limit_param_with_parameters_interpolation",
"test_limit_header_with_parameters_interpolation",
"test_limit_data_with_parameters_interpolation",
"test_limit_json_with_parameters_interpolation",
"test_limit_param_with_config_interpolation",
"test_limit_header_with_config_interpolation",
"test_limit_data_with_config_interpolation",
"test_limit_json_with_config_interpolation",
],
)
def test_request_option(test_name, option_type, field_name):
request_option = RequestOption(inject_into=option_type, field_name=field_name, parameters={})
assert request_option.field_name == field_name
def test_request_option(option_type: RequestOptionType, field_name: str, expected_field_name: str):
request_option = RequestOption(inject_into=option_type, field_name=field_name, parameters={"cursor_field": "updated_at"})
assert request_option.field_name.eval({"cursor_field": "created_at"}) == expected_field_name
assert request_option.inject_into == option_type

View File

@@ -86,7 +86,10 @@ def test_infer_schema(mock_detect_filetype, filetype, format_config, raises):
assert schema == {
"content": {"type": "string", "description": "Content of the file as markdown. Might be null if the file could not be parsed"},
"document_key": {"type": "string", "description": "Unique identifier of the document, e.g. the file path"},
"_ab_source_file_parse_error": {"type": "string", "description": "Error message if the file could not be parsed even though the file is supported"},
"_ab_source_file_parse_error": {
"type": "string",
"description": "Error message if the file could not be parsed even though the file is supported",
},
}
loop.close()
asyncio.set_event_loop(main_loop)
@@ -201,7 +204,7 @@ def test_infer_schema(mock_detect_filetype, filetype, format_config, raises):
{
"content": None,
"document_key": FILE_URI,
"_ab_source_file_parse_error": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. Contact Support if you need assistance.\nfilename=path/to/file.xyz message=weird parsing error"
"_ab_source_file_parse_error": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. Contact Support if you need assistance.\nfilename=path/to/file.xyz message=weird parsing error",
}
],
True,
@@ -323,33 +326,48 @@ def test_check_config(requests_mock, format_config, raises_for_status, json_resp
None,
"test",
[{"type": "Text", "text": "test"}],
[call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")})],
False,
[
{
"content": "test",
"document_key": FILE_URI,
"_ab_source_file_parse_error": None
}
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
)
],
False,
[{"content": "test", "document_key": FILE_URI, "_ab_source_file_parse_error": None}],
200,
id="basic_request",
),
pytest.param(
FileType.PDF,
UnstructuredFormat(skip_unprocessable_file_types=False, strategy="hi_res", processing=APIProcessingConfigModel(mode="api", api_key="test", api_url="http://localhost:8000", parameters=[APIParameterConfigModel(name="include_page_breaks", value="true"), APIParameterConfigModel(name="ocr_languages", value="eng"), APIParameterConfigModel(name="ocr_languages", value="kor")])),
UnstructuredFormat(
skip_unprocessable_file_types=False,
strategy="hi_res",
processing=APIProcessingConfigModel(
mode="api",
api_key="test",
api_url="http://localhost:8000",
parameters=[
APIParameterConfigModel(name="include_page_breaks", value="true"),
APIParameterConfigModel(name="ocr_languages", value="eng"),
APIParameterConfigModel(name="ocr_languages", value="kor"),
],
),
),
None,
"test",
[{"type": "Text", "text": "test"}],
[call("http://localhost:8000/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "hi_res", "include_page_breaks": "true", "ocr_languages": ["eng", "kor"]}, files={"files": ("filename", mock.ANY, "application/pdf")})],
False,
[
{
"content": "test",
"document_key": FILE_URI,
"_ab_source_file_parse_error": None
}
call(
"http://localhost:8000/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "hi_res", "include_page_breaks": "true", "ocr_languages": ["eng", "kor"]},
files={"files": ("filename", mock.ANY, "application/pdf")},
)
],
False,
[{"content": "test", "document_key": FILE_URI, "_ab_source_file_parse_error": None}],
200,
id="request_with_params",
),
@@ -361,13 +379,7 @@ def test_check_config(requests_mock, format_config, raises_for_status, json_resp
None,
None,
False,
[
{
"content": "# Mymarkdown",
"document_key": FILE_URI,
"_ab_source_file_parse_error": None
}
],
[{"content": "# Mymarkdown", "document_key": FILE_URI, "_ab_source_file_parse_error": None}],
200,
id="handle_markdown_locally",
),
@@ -384,15 +396,40 @@ def test_check_config(requests_mock, format_config, raises_for_status, json_resp
"test",
None,
[
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
],
True,
@@ -411,21 +448,30 @@ def test_check_config(requests_mock, format_config, raises_for_status, json_resp
"test",
[{"type": "Text", "text": "test"}],
[
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
],
False,
[
{
"content": "test",
"document_key": FILE_URI,
"_ab_source_file_parse_error": None
}
],
[{"content": "test", "document_key": FILE_URI, "_ab_source_file_parse_error": None}],
200,
id="retry_and_recover",
),
@@ -438,7 +484,12 @@ def test_check_config(requests_mock, format_config, raises_for_status, json_resp
"test",
[{"type": "Text", "text": "test"}],
[
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
],
True,
@@ -455,7 +506,12 @@ def test_check_config(requests_mock, format_config, raises_for_status, json_resp
"test",
[{"type": "Text", "text": "test"}],
[
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
call().raise_for_status(),
],
True,
@@ -470,7 +526,12 @@ def test_check_config(requests_mock, format_config, raises_for_status, json_resp
"test",
[{"detail": "Something went wrong"}],
[
call("https://api.unstructured.io/general/v0/general", headers={"accept": "application/json", "unstructured-api-key": "test"}, data={"strategy": "auto"}, files={"files": ("filename", mock.ANY, "application/pdf")}),
call(
"https://api.unstructured.io/general/v0/general",
headers={"accept": "application/json", "unstructured-api-key": "test"},
data={"strategy": "auto"},
files={"files": ("filename", mock.ANY, "application/pdf")},
),
],
False,
[
@@ -487,7 +548,7 @@ def test_check_config(requests_mock, format_config, raises_for_status, json_resp
)
@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.requests")
@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype")
@patch('time.sleep', side_effect=lambda _: None)
@patch("time.sleep", side_effect=lambda _: None)
def test_parse_records_remotely(
time_mock,
mock_detect_filetype,
@@ -500,7 +561,7 @@ def test_parse_records_remotely(
expected_requests,
raises,
expected_records,
http_status_code
http_status_code,
):
stream_reader = MagicMock()
mock_open(stream_reader.open_file, read_data=bytes(str(file_content), "utf-8"))

View File

@@ -42,10 +42,15 @@ single_csv_input_state_is_earlier_scenario_concurrent = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-01T03:54:07.000000Z_some_old_file.csv"
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-01T03:54:07.000000Z_some_old_file.csv",
},
)
.build(),
)
)
.set_expected_records(
@@ -137,10 +142,15 @@ single_csv_file_is_skipped_if_same_modified_at_as_in_history_concurrent = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
},
)
.build(),
)
)
.set_expected_records(
@@ -214,10 +224,15 @@ single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history_concurre
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-01T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-01T03:54:07.000000Z_a.csv",
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"a.csv": "2023-06-01T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-01T03:54:07.000000Z_a.csv",
},
)
.build(),
)
)
.set_expected_records(
@@ -580,12 +595,15 @@ single_csv_input_state_is_later_scenario_concurrent = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
"stream1", {
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"recent_file.csv": "2023-07-15T23:59:59.000000Z"},
"_ab_source_file_last_modified": "2023-07-15T23:59:59.000000Z_recent_file.csv",
},
).build(),
)
.build(),
)
)
).build()
@@ -1022,10 +1040,12 @@ multi_csv_skip_file_if_already_in_history_concurrent = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv"}).build(),
{"history": {"a.csv": "2023-06-05T03:54:07.000000Z"}, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv"},
)
.build(),
)
)
).build()
@@ -1146,13 +1166,15 @@ multi_csv_include_missing_files_within_history_range_concurrent_cursor_is_newer
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "c.csv": "2023-06-06T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
}
).build(),
},
)
.build(),
)
)
).build()
@@ -1273,13 +1295,15 @@ multi_csv_include_missing_files_within_history_range_concurrent_cursor_is_older
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "c.csv": "2023-06-06T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-03T03:54:07.000000Z_x.csv",
}
).build()
},
)
.build()
)
)
).build()
@@ -1454,7 +1478,8 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_newe
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
@@ -1463,8 +1488,9 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_newe
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_old_file_same_timestamp_as_a.csv",
}
).build(),
},
)
.build(),
)
)
).build()
@@ -1639,7 +1665,8 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_olde
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
@@ -1648,8 +1675,9 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_olde
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-05-01T03:54:07.000000Z_very_very_very_old_file.csv",
}
).build(),
},
)
.build(),
)
)
).build()
@@ -2123,7 +2151,8 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario_concurrent_cursor_
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
@@ -2132,8 +2161,9 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario_concurrent_cursor_
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
).build(),
},
)
.build(),
)
)
).build()
@@ -2239,7 +2269,8 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario_concurrent_cursor_
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
@@ -2248,8 +2279,9 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario_concurrent_cursor_
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
).build(),
},
)
.build(),
)
)
).build()
@@ -2378,7 +2410,8 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
@@ -2387,8 +2420,9 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
"e.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_e.csv",
}
).build(),
},
)
.build(),
)
)
).build()
@@ -2516,7 +2550,8 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
@@ -2525,15 +2560,18 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
"e.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_e.csv",
}
).build(),
},
)
.build(),
)
)
).build()
multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario_concurrent_cursor_is_newer = (
TestScenarioBuilder()
.set_name("multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario_concurrent_cursor_is_newer")
.set_name(
"multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario_concurrent_cursor_is_newer"
)
.set_config(
{
"streams": [
@@ -2680,7 +2718,8 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
@@ -2689,15 +2728,18 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
}
).build(),
},
)
.build(),
)
)
).build()
multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario_concurrent_cursor_is_older = (
TestScenarioBuilder()
.set_name("multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario_concurrent_cursor_is_older")
.set_name(
"multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario_concurrent_cursor_is_older"
)
.set_config(
{
"streams": [
@@ -2844,7 +2886,8 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state(
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
@@ -2853,8 +2896,9 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-04T00:00:00.000000Z_very_old_file.csv",
}
).build(),
},
)
.build(),
)
)
).build()

View File

@@ -324,9 +324,7 @@ single_csv_scenario: TestScenario[InMemoryFilesSource] = (
"processing": {
"title": "Processing",
"description": "Processing configuration",
"default": {
"mode": "local"
},
"default": {"mode": "local"},
"type": "object",
"oneOf": [
{
@@ -337,16 +335,12 @@ single_csv_scenario: TestScenario[InMemoryFilesSource] = (
"title": "Mode",
"default": "local",
"const": "local",
"enum": [
"local"
],
"type": "string"
"enum": ["local"],
"type": "string",
}
},
"description": "Process files locally, supporting `fast` and `ocr` modes. This is the default option.",
"required": [
"mode"
]
"required": ["mode"],
},
{
"title": "via API",
@@ -356,10 +350,8 @@ single_csv_scenario: TestScenario[InMemoryFilesSource] = (
"title": "Mode",
"default": "api",
"const": "api",
"enum": [
"api"
],
"type": "string"
"enum": ["api"],
"type": "string",
},
"api_key": {
"title": "API Key",
@@ -367,17 +359,15 @@ single_csv_scenario: TestScenario[InMemoryFilesSource] = (
"default": "",
"always_show": True,
"airbyte_secret": True,
"type": "string"
"type": "string",
},
"api_url": {
"title": "API URL",
"description": "The URL of the unstructured API to use",
"default": "https://api.unstructured.io",
"always_show": True,
"examples": [
"https://api.unstructured.com"
],
"type": "string"
"examples": ["https://api.unstructured.com"],
"type": "string",
},
"parameters": {
"title": "Additional URL Parameters",
@@ -392,35 +382,24 @@ single_csv_scenario: TestScenario[InMemoryFilesSource] = (
"name": {
"title": "Parameter name",
"description": "The name of the unstructured API parameter to use",
"examples": [
"combine_under_n_chars",
"languages"
],
"type": "string"
"examples": ["combine_under_n_chars", "languages"],
"type": "string",
},
"value": {
"title": "Value",
"description": "The value of the parameter",
"examples": [
"true",
"hi_res"
],
"type": "string"
}
"examples": ["true", "hi_res"],
"type": "string",
},
},
"required": [
"name",
"value"
]
}
}
"required": ["name", "value"],
},
},
},
"description": "Process files via an API, using the `hi_res` mode. This option is useful for increased performance and accuracy, but requires an API key and a hosted instance of unstructured.",
"required": [
"mode"
]
}
]
"required": ["mode"],
},
],
},
},
"description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file.",

View File

@@ -42,9 +42,14 @@ single_csv_input_state_is_earlier_scenario = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z"},
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z"},
},
)
.build(),
)
)
.set_expected_records(
@@ -136,9 +141,14 @@ single_csv_file_is_skipped_if_same_modified_at_as_in_history = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
},
)
.build(),
)
)
.set_expected_records(
@@ -212,9 +222,14 @@ single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-01T03:54:07.000000Z"},
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"a.csv": "2023-06-01T03:54:07.000000Z"},
},
)
.build(),
)
)
.set_expected_records(
@@ -577,9 +592,14 @@ single_csv_input_state_is_later_scenario = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"recent_file.csv": "2023-07-15T23:59:59.000000Z"},
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"recent_file.csv": "2023-07-15T23:59:59.000000Z"},
},
)
.build(),
)
)
).build()
@@ -1016,9 +1036,14 @@ multi_csv_skip_file_if_already_in_history = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
},
)
.build(),
)
)
).build()
@@ -1139,9 +1164,14 @@ multi_csv_include_missing_files_within_history_range = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "c.csv": "2023-06-06T03:54:07.000000Z"},
}).build(),
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "c.csv": "2023-06-06T03:54:07.000000Z"},
},
)
.build(),
)
)
).build()
@@ -1316,13 +1346,18 @@ multi_csv_remove_old_files_if_history_is_full_scenario = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {
"very_very_old_file.csv": "2023-06-01T03:54:07.000000Z",
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
"very_very_old_file.csv": "2023-06-01T03:54:07.000000Z",
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
},
},
}).build(),
)
.build(),
)
)
).build()
@@ -1612,13 +1647,18 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario = (
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
},
}).build(),
)
.build(),
)
)
).build()
@@ -1746,13 +1786,18 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
},
}).build(),
)
.build(),
)
)
).build()
@@ -1906,13 +1951,18 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=StateBuilder().with_stream_state("stream1", {
"history": {
"old_file.csv": "2023-06-05T00:00:00.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
input_state=StateBuilder()
.with_stream_state(
"stream1",
{
"history": {
"old_file.csv": "2023-06-05T00:00:00.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
},
}).build(),
)
.build(),
)
)
).build()

View File

@@ -484,9 +484,7 @@ invalid_jsonl_scenario = (
]
}
)
.set_expected_records(
[]
)
.set_expected_records([])
.set_expected_discover_error(AirbyteTracedException, FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value)
.set_expected_read_error(AirbyteTracedException, "Please check the logged errors for more information.")
.set_expected_logs(

View File

@@ -15,12 +15,18 @@ nltk.download("averaged_perceptron_tagger")
json_schema = {
"type": "object",
"properties": {
"content": {"type": ["null", "string"], "description": "Content of the file as markdown. Might be null if the file could not be parsed"},
"content": {
"type": ["null", "string"],
"description": "Content of the file as markdown. Might be null if the file could not be parsed",
},
"document_key": {"type": ["null", "string"], "description": "Unique identifier of the document, e.g. the file path"},
"_ab_source_file_parse_error": {"type": ["null", "string"], "description": "Error message if the file could not be parsed even though the file is supported"},
"_ab_source_file_parse_error": {
"type": ["null", "string"],
"description": "Error message if the file could not be parsed even though the file is supported",
},
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
}
},
}
simple_markdown_scenario = (
@@ -69,7 +75,7 @@ simple_markdown_scenario = (
"json_schema": json_schema,
"name": "stream1",
"source_defined_cursor": True,
'source_defined_primary_key': [["document_key"]],
"source_defined_primary_key": [["document_key"]],
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
@@ -104,7 +110,6 @@ simple_markdown_scenario = (
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "c",
"_ab_source_file_parse_error": None,
},
"stream": "stream1",
},
@@ -132,9 +137,7 @@ simple_txt_scenario = (
.set_files(
{
"a.txt": {
"contents": bytes(
"Just some raw text", "UTF-8"
),
"contents": bytes("Just some raw text", "UTF-8"),
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b": {
@@ -154,7 +157,7 @@ simple_txt_scenario = (
"json_schema": json_schema,
"name": "stream1",
"source_defined_cursor": True,
'source_defined_primary_key': [["document_key"]],
"source_defined_primary_key": [["document_key"]],
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
@@ -179,7 +182,6 @@ simple_txt_scenario = (
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "b",
"_ab_source_file_parse_error": None,
},
"stream": "stream1",
},
@@ -223,7 +225,7 @@ unstructured_invalid_file_type_discover_scenario_no_skip = (
"json_schema": json_schema,
"name": "stream1",
"source_defined_cursor": True,
'source_defined_primary_key': [["document_key"]],
"source_defined_primary_key": [["document_key"]],
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
@@ -273,7 +275,7 @@ unstructured_invalid_file_type_discover_scenario_skip = (
"json_schema": json_schema,
"name": "stream1",
"source_defined_cursor": True,
'source_defined_primary_key': [["document_key"]],
"source_defined_primary_key": [["document_key"]],
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
@@ -337,7 +339,7 @@ unstructured_invalid_file_type_read_scenario = (
"json_schema": json_schema,
"name": "stream1",
"source_defined_cursor": True,
'source_defined_primary_key': [["document_key"]],
"source_defined_primary_key": [["document_key"]],
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
@@ -417,7 +419,7 @@ simple_unstructured_scenario = (
"json_schema": json_schema,
"name": "stream1",
"source_defined_cursor": True,
'source_defined_primary_key': [["document_key"]],
"source_defined_primary_key": [["document_key"]],
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
@@ -495,7 +497,7 @@ corrupted_file_scenario = (
"json_schema": json_schema,
"name": "stream1",
"source_defined_cursor": True,
'source_defined_primary_key': [["document_key"]],
"source_defined_primary_key": [["document_key"]],
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
@@ -563,7 +565,7 @@ no_file_extension_unstructured_scenario = (
"json_schema": json_schema,
"name": "stream1",
"source_defined_cursor": True,
'source_defined_primary_key': [["document_key"]],
"source_defined_primary_key": [["document_key"]],
"supported_sync_modes": ["full_refresh", "incremental"],
}
]

View File

@@ -442,7 +442,13 @@ multi_stream_user_input_schema_scenario_schema_is_invalid = (
]
}
)
.set_catalog(CatalogBuilder().with_stream("stream1", SyncMode.full_refresh).with_stream("stream2", SyncMode.full_refresh).with_stream("stream3", SyncMode.full_refresh).build())
.set_catalog(
CatalogBuilder()
.with_stream("stream1", SyncMode.full_refresh)
.with_stream("stream2", SyncMode.full_refresh)
.with_stream("stream3", SyncMode.full_refresh)
.build()
)
.set_expected_check_status("FAILED")
.set_expected_check_error(None, FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA.value)
.set_expected_discover_error(ConfigValidationError, FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA.value)

View File

@@ -49,11 +49,15 @@ _ANY_CURSOR = Mock(spec=FileBasedNoopCursor)
def test_file_based_stream_partition_generator(sync_mode):
stream = Mock()
message_repository = Mock()
stream_slices = [{"files": [RemoteFile(uri="1", last_modified=datetime.now())]},
{"files": [RemoteFile(uri="2", last_modified=datetime.now())]}]
stream_slices = [
{"files": [RemoteFile(uri="1", last_modified=datetime.now())]},
{"files": [RemoteFile(uri="2", last_modified=datetime.now())]},
]
stream.stream_slices.return_value = stream_slices
partition_generator = FileBasedStreamPartitionGenerator(stream, message_repository, _ANY_SYNC_MODE, _ANY_CURSOR_FIELD, _ANY_STATE, _ANY_CURSOR)
partition_generator = FileBasedStreamPartitionGenerator(
stream, message_repository, _ANY_SYNC_MODE, _ANY_CURSOR_FIELD, _ANY_STATE, _ANY_CURSOR
)
partitions = list(partition_generator.generate())
slices = [partition.to_slice() for partition in partitions]
@@ -134,7 +138,11 @@ def test_file_based_stream_partition_raising_exception(exception_type, expected_
@pytest.mark.parametrize(
"_slice, expected_hash",
[
pytest.param({"files": [RemoteFile(uri="1", last_modified=datetime.strptime("2023-06-09T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"))]}, hash(("stream", "2023-06-09T00:00:00.000000Z_1")), id="test_hash_with_slice"),
pytest.param(
{"files": [RemoteFile(uri="1", last_modified=datetime.strptime("2023-06-09T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"))]},
hash(("stream", "2023-06-09T00:00:00.000000Z_1")),
id="test_hash_with_slice",
),
pytest.param(None, hash("stream"), id="test_hash_no_slice"),
],
)

View File

@@ -45,26 +45,17 @@ def _make_cursor(input_state: Optional[MutableMapping[str, Any]]) -> FileBasedCo
pytest.param({}, (datetime.min, ""), id="no-state-gives-min-cursor"),
pytest.param({"history": {}}, (datetime.min, ""), id="missing-cursor-field-gives-min-cursor"),
pytest.param(
{
"history": {"a.csv": "2021-01-01T00:00:00.000000Z"},
"_ab_source_file_last_modified": "2021-01-01T00:00:00.000000Z_a.csv"
},
{"history": {"a.csv": "2021-01-01T00:00:00.000000Z"}, "_ab_source_file_last_modified": "2021-01-01T00:00:00.000000Z_a.csv"},
(datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "a.csv"),
id="cursor-value-matches-earliest-file",
),
pytest.param(
{
"history": {"a.csv": "2021-01-01T00:00:00.000000Z"},
"_ab_source_file_last_modified": "2020-01-01T00:00:00.000000Z_a.csv"
},
{"history": {"a.csv": "2021-01-01T00:00:00.000000Z"}, "_ab_source_file_last_modified": "2020-01-01T00:00:00.000000Z_a.csv"},
(datetime.strptime("2020-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "a.csv"),
id="cursor-value-is-earlier",
),
pytest.param(
{
"history": {"a.csv": "2022-01-01T00:00:00.000000Z"},
"_ab_source_file_last_modified": "2021-01-01T00:00:00.000000Z_a.csv"
},
{"history": {"a.csv": "2022-01-01T00:00:00.000000Z"}, "_ab_source_file_last_modified": "2021-01-01T00:00:00.000000Z_a.csv"},
(datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "a.csv"),
id="cursor-value-is-later",
),
@@ -73,30 +64,24 @@ def _make_cursor(input_state: Optional[MutableMapping[str, Any]]) -> FileBasedCo
"history": {
"a.csv": "2021-01-01T00:00:00.000000Z",
"b.csv": "2021-01-02T00:00:00.000000Z",
"c.csv": "2021-01-03T00:00:00.000000Z"
"c.csv": "2021-01-03T00:00:00.000000Z",
},
"_ab_source_file_last_modified": "2021-01-04T00:00:00.000000Z_d.csv"
"_ab_source_file_last_modified": "2021-01-04T00:00:00.000000Z_d.csv",
},
(datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "a.csv"),
id="cursor-not-earliest",
),
pytest.param(
{
"history": {"b.csv": "2020-12-31T00:00:00.000000Z"},
"_ab_source_file_last_modified": "2021-01-01T00:00:00.000000Z_a.csv"
},
{"history": {"b.csv": "2020-12-31T00:00:00.000000Z"}, "_ab_source_file_last_modified": "2021-01-01T00:00:00.000000Z_a.csv"},
(datetime.strptime("2020-12-31T00:00:00.000000Z", DATE_TIME_FORMAT), "b.csv"),
id="state-with-cursor-and-earlier-history"
id="state-with-cursor-and-earlier-history",
),
pytest.param(
{
"history": {"b.csv": "2021-01-02T00:00:00.000000Z"},
"_ab_source_file_last_modified": "2021-01-01T00:00:00.000000Z_a.csv"
},
{"history": {"b.csv": "2021-01-02T00:00:00.000000Z"}, "_ab_source_file_last_modified": "2021-01-01T00:00:00.000000Z_a.csv"},
(datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "a.csv"),
id="state-with-cursor-and-later-history"
id="state-with-cursor-and-later-history",
),
]
],
)
def test_compute_prev_sync_cursor(input_state: MutableMapping[str, Any], expected_cursor_value: Tuple[datetime, str]):
cursor = _make_cursor(input_state)
@@ -160,7 +145,7 @@ def test_compute_prev_sync_cursor(input_state: MutableMapping[str, Any], expecte
"2022-01-05T00:00:00.000000Z_pending.csv",
id="add-to-nonempty-history-pending-file-is-newer",
),
]
],
)
def test_add_file(
initial_state: MutableMapping[str, Any],
@@ -175,23 +160,31 @@ def test_add_file(
cursor._message_repository = mock_message_repository
stream = MagicMock()
cursor.set_pending_partitions([
FileBasedStreamPartition(
stream,
{"files": [RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT))]},
mock_message_repository,
SyncMode.full_refresh,
FileBasedConcurrentCursor.CURSOR_FIELD,
initial_state,
cursor
) for uri, timestamp in pending_files
])
cursor.set_pending_partitions(
[
FileBasedStreamPartition(
stream,
{"files": [RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT))]},
mock_message_repository,
SyncMode.full_refresh,
FileBasedConcurrentCursor.CURSOR_FIELD,
initial_state,
cursor,
)
for uri, timestamp in pending_files
]
)
uri, timestamp = file_to_add
cursor.add_file(RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)))
assert cursor._file_to_datetime_history == expected_history
assert cursor._pending_files == {uri: RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)) for uri, timestamp in expected_pending_files}
assert mock_message_repository.emit_message.call_args_list[0].args[0].state.data["test"]["_ab_source_file_last_modified"] == expected_cursor_value
assert cursor._pending_files == {
uri: RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)) for uri, timestamp in expected_pending_files
}
assert (
mock_message_repository.emit_message.call_args_list[0].args[0].state.data["test"]["_ab_source_file_last_modified"]
== expected_cursor_value
)
@pytest.mark.parametrize(
@@ -215,7 +208,7 @@ def test_add_file(
"2021-01-05T00:00:00.000000Z_pending.csv",
id="add-to-empty-history-file-not-in-pending-files",
),
]
],
)
def test_add_file_invalid(
initial_state: MutableMapping[str, Any],
@@ -226,16 +219,23 @@ def test_add_file_invalid(
expected_cursor_value: str,
):
cursor = _make_cursor(initial_state)
cursor._pending_files = {uri: RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)) for uri, timestamp in pending_files}
cursor._pending_files = {
uri: RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)) for uri, timestamp in pending_files
}
mock_message_repository = MagicMock()
cursor._message_repository = mock_message_repository
uri, timestamp = file_to_add
cursor.add_file(RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)))
assert cursor._file_to_datetime_history == expected_history
assert cursor._pending_files == {uri: RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)) for uri, timestamp in expected_pending_files}
assert cursor._pending_files == {
uri: RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)) for uri, timestamp in expected_pending_files
}
assert mock_message_repository.emit_message.call_args_list[0].args[0].log.level.value == "WARN"
assert mock_message_repository.emit_message.call_args_list[1].args[0].state.data["test"]["_ab_source_file_last_modified"] == expected_cursor_value
assert (
mock_message_repository.emit_message.call_args_list[1].args[0].state.data["test"]["_ab_source_file_last_modified"]
== expected_cursor_value
)
@pytest.mark.parametrize(
@@ -243,37 +243,33 @@ def test_add_file_invalid(
[
pytest.param({}, [], f"{datetime.min.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_", id="no-state-no-pending"),
pytest.param(
{"history": {"a.csv": "2021-01-01T00:00:00.000000Z"}},
[],
"2021-01-01T00:00:00.000000Z_a.csv",
id="no-pending-with-history"
{"history": {"a.csv": "2021-01-01T00:00:00.000000Z"}}, [], "2021-01-01T00:00:00.000000Z_a.csv", id="no-pending-with-history"
),
pytest.param(
{"history": {}},
[("b.csv", "2021-01-02T00:00:00.000000Z")],
"2021-01-02T00:00:00.000000Z_b.csv",
id="pending-no-history"
{"history": {}}, [("b.csv", "2021-01-02T00:00:00.000000Z")], "2021-01-02T00:00:00.000000Z_b.csv", id="pending-no-history"
),
pytest.param(
{"history": {"a.csv": "2022-01-01T00:00:00.000000Z"}},
[("b.csv", "2021-01-02T00:00:00.000000Z")],
"2021-01-01T00:00:00.000000Z_a.csv",
id="with-pending-before-history"
id="with-pending-before-history",
),
pytest.param(
{"history": {"a.csv": "2021-01-01T00:00:00.000000Z"}},
[("b.csv", "2022-01-02T00:00:00.000000Z")],
"2022-01-01T00:00:00.000000Z_a.csv",
id="with-pending-after-history"
id="with-pending-after-history",
),
]
],
)
def test_get_new_cursor_value(input_state: MutableMapping[str, Any], pending_files: List[Tuple[str, str]], expected_cursor_value: str):
cursor = _make_cursor(input_state)
pending_partitions = []
for url, timestamp in pending_files:
partition = MagicMock()
partition.to_slice = lambda *args, **kwargs: {"files": [RemoteFile(uri=url, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT))]}
partition.to_slice = lambda *args, **kwargs: {
"files": [RemoteFile(uri=url, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT))]
}
pending_partitions.append(partition)
cursor.set_pending_partitions(pending_partitions)
@@ -288,7 +284,7 @@ def test_get_new_cursor_value(input_state: MutableMapping[str, Any], pending_fil
False,
(datetime.min, ""),
["new.csv"],
id="empty-history-one-new-file"
id="empty-history-one-new-file",
),
pytest.param(
[RemoteFile(uri="a.csv", last_modified=datetime.strptime("2021-01-02T00:00:00.000000Z", "%Y-%m-%dT%H:%M:%S.%fZ"))],
@@ -296,7 +292,7 @@ def test_get_new_cursor_value(input_state: MutableMapping[str, Any], pending_fil
False,
(datetime.min, ""),
["a.csv"],
id="non-empty-history-file-in-history-modified"
id="non-empty-history-file-in-history-modified",
),
pytest.param(
[RemoteFile(uri="a.csv", last_modified=datetime.strptime("2021-01-01T00:00:00.000000Z", "%Y-%m-%dT%H:%M:%S.%fZ"))],
@@ -304,9 +300,9 @@ def test_get_new_cursor_value(input_state: MutableMapping[str, Any], pending_fil
False,
(datetime.min, ""),
[],
id="non-empty-history-file-in-history-not-modified"
id="non-empty-history-file-in-history-not-modified",
),
]
],
)
def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_value, expected_files_to_sync):
cursor = _make_cursor({})
@@ -328,7 +324,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.min, ""),
datetime.min,
True,
id="file-not-in-history-not-full-old-cursor"
id="file-not-in-history-not-full-old-cursor",
),
pytest.param(
RemoteFile(uri="new.csv", last_modified=datetime.strptime("2021-01-03T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -337,7 +333,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.strptime("2024-01-02T00:00:00.000000Z", DATE_TIME_FORMAT), ""),
datetime.min,
True,
id="file-not-in-history-not-full-new-cursor"
id="file-not-in-history-not-full-new-cursor",
),
pytest.param(
RemoteFile(uri="a.csv", last_modified=datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -346,7 +342,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.min, ""),
datetime.min,
False,
id="file-in-history-not-modified"
id="file-in-history-not-modified",
),
pytest.param(
RemoteFile(uri="a.csv", last_modified=datetime.strptime("2020-01-01T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -355,7 +351,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.min, ""),
datetime.min,
False,
id="file-in-history-modified-before"
id="file-in-history-modified-before",
),
pytest.param(
RemoteFile(uri="a.csv", last_modified=datetime.strptime("2022-01-01T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -364,7 +360,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.min, ""),
datetime.min,
True,
id="file-in-history-modified-after"
id="file-in-history-modified-after",
),
pytest.param(
RemoteFile(uri="new.csv", last_modified=datetime.strptime("2022-01-01T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -373,7 +369,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.strptime("2021-01-02T00:00:00.000000Z", DATE_TIME_FORMAT), "a.csv"),
datetime.min,
True,
id="history-full-file-modified-after-cursor"
id="history-full-file-modified-after-cursor",
),
pytest.param(
RemoteFile(uri="new1.csv", last_modified=datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -382,7 +378,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "new0.csv"),
datetime.min,
True,
id="history-full-modified-eq-cursor-uri-gt"
id="history-full-modified-eq-cursor-uri-gt",
),
pytest.param(
RemoteFile(uri="new0.csv", last_modified=datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -391,7 +387,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "new1.csv"),
datetime.min,
False,
id="history-full-modified-eq-cursor-uri-lt"
id="history-full-modified-eq-cursor-uri-lt",
),
pytest.param(
RemoteFile(uri="new.csv", last_modified=datetime.strptime("2020-01-01T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -400,7 +396,7 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "a.csv"),
datetime.min,
True,
id="history-full-modified-before-cursor-and-after-sync-start"
id="history-full-modified-before-cursor-and-after-sync-start",
),
pytest.param(
RemoteFile(uri="new.csv", last_modified=datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT)),
@@ -409,9 +405,9 @@ def test_get_files_to_sync(all_files, history, is_history_full, prev_cursor_valu
(datetime.strptime("2022-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), "a.csv"),
datetime.strptime("2024-01-01T00:00:00.000000Z", DATE_TIME_FORMAT),
False,
id="history-full-modified-before-cursor-and-before-sync-start"
id="history-full-modified-before-cursor-and-before-sync-start",
),
]
],
)
def test_should_sync_file(
file_to_check: RemoteFile,
@@ -439,21 +435,21 @@ def test_should_sync_file(
{"a.csv": "2021-01-01T00:00:00.000000Z"},
False,
datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT),
id="non-full-history"
id="non-full-history",
),
pytest.param(
{f"file{i}.csv": f"2021-01-0{i}T00:00:00.000000Z" for i in range(1, 4)}, # all before the time window
True,
datetime.strptime("2021-01-01T00:00:00.000000Z", DATE_TIME_FORMAT), # Time window start time
id="full-history-earliest-before-window"
id="full-history-earliest-before-window",
),
pytest.param(
{f"file{i}.csv": f"2024-01-0{i}T00:00:00.000000Z" for i in range(1, 4)}, # all after the time window
True,
datetime.strptime("2023-06-13T00:00:00.000000Z", DATE_TIME_FORMAT), # Earliest file time
id="full-history-earliest-after-window"
id="full-history-earliest-after-window",
),
]
],
)
def test_compute_start_time(input_history, is_history_full, expected_start_time, monkeypatch):
cursor = _make_cursor({"history": input_history})

View File

@@ -66,9 +66,7 @@ class MockFormat:
),
],
)
def test_fill_nulls(
input_schema: Mapping[str, Any], expected_output: Mapping[str, Any]
) -> None:
def test_fill_nulls(input_schema: Mapping[str, Any], expected_output: Mapping[str, Any]) -> None:
assert DefaultFileBasedStream._fill_nulls(input_schema) == expected_output
@@ -103,14 +101,8 @@ class DefaultFileBasedStreamTest(unittest.TestCase):
def test_when_read_records_from_slice_then_return_records(self) -> None:
self._parser.parse_records.return_value = [self._A_RECORD]
messages = list(
self._stream.read_records_from_slice(
{"files": [RemoteFile(uri="uri", last_modified=self._NOW)]}
)
)
assert list(map(lambda message: message.record.data["data"], messages)) == [
self._A_RECORD
]
messages = list(self._stream.read_records_from_slice({"files": [RemoteFile(uri="uri", last_modified=self._NOW)]}))
assert list(map(lambda message: message.record.data["data"], messages)) == [self._A_RECORD]
def test_given_exception_when_read_records_from_slice_then_do_process_other_files(
self,
@@ -165,9 +157,7 @@ class DefaultFileBasedStreamTest(unittest.TestCase):
) -> None:
self._stream_config.schemaless = False
self._validation_policy.record_passes_validation_policy.return_value = False
self._parser.parse_records.side_effect = [
self._iter([self._A_RECORD, ValueError("An error")])
]
self._parser.parse_records.side_effect = [self._iter([self._A_RECORD, ValueError("An error")])]
messages = list(
self._stream.read_records_from_slice(
@@ -225,9 +215,7 @@ class TestFileBasedErrorCollector:
"Multiple errors",
],
)
def test_collect_parsing_error(
self, stream, file, line_no, n_skipped, collector_expected_len
) -> None:
def test_collect_parsing_error(self, stream, file, line_no, n_skipped, collector_expected_len) -> None:
test_error_pattern = "Error parsing record."
# format the error body
test_error = (
@@ -252,11 +240,5 @@ class TestFileBasedErrorCollector:
# we expect the following method will raise the AirbyteTracedException
with pytest.raises(AirbyteTracedException) as parse_error:
list(self.test_error_collector.yield_and_raise_collected())
assert (
parse_error.value.message
== "Some errors occured while reading from the source."
)
assert (
parse_error.value.internal_message
== "Please check the logged errors for more information."
)
assert parse_error.value.message == "Some errors occured while reading from the source."
assert parse_error.value.internal_message == "Please check the logged errors for more information."

View File

@@ -258,7 +258,6 @@ discover_scenarios = [
single_csv_input_state_is_earlier_scenario_concurrent,
single_csv_input_state_is_later_scenario_concurrent,
single_csv_no_input_state_scenario_concurrent,
]
read_scenarios = discover_scenarios + [

View File

@@ -80,11 +80,15 @@ def _verify_read_output(output: EntrypointOutput, scenario: TestScenario[Abstrac
sorted_expected_records = sorted(
filter(lambda e: "data" in e, expected_records),
key=lambda record: ",".join(f"{k}={v}" for k, v in sorted(record["data"].items(), key=lambda items: (items[0], items[1])) if k != "emitted_at"),
key=lambda record: ",".join(
f"{k}={v}" for k, v in sorted(record["data"].items(), key=lambda items: (items[0], items[1])) if k != "emitted_at"
),
)
sorted_records = sorted(
filter(lambda r: r.record, records),
key=lambda record: ",".join(f"{k}={v}" for k, v in sorted(record.record.data.items(), key=lambda items: (items[0], items[1])) if k != "emitted_at"),
key=lambda record: ",".join(
f"{k}={v}" for k, v in sorted(record.record.data.items(), key=lambda items: (items[0], items[1])) if k != "emitted_at"
),
)
assert len(sorted_records) == len(sorted_expected_records)

View File

@@ -163,10 +163,17 @@ test_incremental_stream_with_slice_boundaries_with_legacy_state = (
)
CONCURRENT_STATE = StateBuilder().with_stream_state("stream1", {
"slices": [{"start": 0, "end": 0}],
"state_type": ConcurrencyCompatibleStateType.date_range.value,
}).build()
CONCURRENT_STATE = (
StateBuilder()
.with_stream_state(
"stream1",
{
"slices": [{"start": 0, "end": 0}],
"state_type": ConcurrencyCompatibleStateType.date_range.value,
},
)
.build()
)
test_incremental_stream_without_slice_boundaries_with_concurrent_state = (
TestScenarioBuilder()
.set_name("test_incremental_stream_without_slice_boundaries_with_concurrent_state")

View File

@@ -357,7 +357,7 @@ test_incremental_stream_with_slice_boundaries = (
[
{"data": {"id": "1", "cursor_field": 0}, "stream": "stream1"},
{"data": {"id": "2", "cursor_field": 1}, "stream": "stream1"},
{"stream1": {'cursor_field': 1}},
{"stream1": {"cursor_field": 1}},
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
{"stream1": {"cursor_field": 2}},

View File

@@ -113,7 +113,7 @@ class TestConcurrentReadProcessor(unittest.TestCase):
def test_handle_last_stream_partition_done(self):
in_order_validation_mock = Mock()
in_order_validation_mock.attach_mock(self._another_stream, "_another_stream")
in_order_validation_mock.attach_mock(self._message_repository, '_message_repository')
in_order_validation_mock.attach_mock(self._message_repository, "_message_repository")
self._message_repository.consume_queue.return_value = iter([_ANY_AIRBYTE_MESSAGE])
stream_instances_to_read_from = [self._another_stream]
@@ -143,10 +143,12 @@ class TestConcurrentReadProcessor(unittest.TestCase):
status=AirbyteStreamStatus(AirbyteStreamStatus.COMPLETE),
),
),
)
),
]
assert expected_messages == messages
assert in_order_validation_mock.mock_calls.index(call._another_stream.cursor.ensure_at_least_one_state_emitted) < in_order_validation_mock.mock_calls.index(call._message_repository.consume_queue)
assert in_order_validation_mock.mock_calls.index(
call._another_stream.cursor.ensure_at_least_one_state_emitted
) < in_order_validation_mock.mock_calls.index(call._message_repository.consume_queue)
def test_handle_partition(self):
stream_instances_to_read_from = [self._another_stream]
@@ -525,10 +527,11 @@ class TestConcurrentReadProcessor(unittest.TestCase):
type=TraceType.STREAM_STATUS,
emitted_at=1577836800000.0,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name=_ANOTHER_STREAM_NAME), status=AirbyteStreamStatus(AirbyteStreamStatus.INCOMPLETE)
stream_descriptor=StreamDescriptor(name=_ANOTHER_STREAM_NAME),
status=AirbyteStreamStatus(AirbyteStreamStatus.INCOMPLETE),
),
),
)
),
]
assert messages == expected_message

View File

@@ -112,7 +112,7 @@ class ConcurrentCursorTest(TestCase):
self._state_manager.update_state_for_stream.assert_called_once_with(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
{'a_cursor_field_key': 10},
{"a_cursor_field_key": 10},
)
def test_given_no_boundary_fields_when_close_multiple_partitions_then_raise_exception(self) -> None:

View File

@@ -94,72 +94,72 @@ def test_concurrent_stream_state_converter_is_state_message_compatible(converter
None,
{},
EpochValueConcurrentStreamStateConverter().zero_value,
id="epoch-converter-no-state-no-start-start-is-zero-value"
id="epoch-converter-no-state-no-start-start-is-zero-value",
),
pytest.param(
EpochValueConcurrentStreamStateConverter(),
1617030403,
{},
datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
id="epoch-converter-no-state-with-start-start-is-start"
id="epoch-converter-no-state-with-start-start-is-start",
),
pytest.param(
EpochValueConcurrentStreamStateConverter(),
None,
{"created_at": 1617030404},
datetime(2021, 3, 29, 15, 6, 44, tzinfo=timezone.utc),
id="epoch-converter-state-without-start-start-is-from-state"
id="epoch-converter-state-without-start-start-is-from-state",
),
pytest.param(
EpochValueConcurrentStreamStateConverter(),
1617030404,
{"created_at": 1617030403},
datetime(2021, 3, 29, 15, 6, 44, tzinfo=timezone.utc),
id="epoch-converter-state-before-start-start-is-start"
id="epoch-converter-state-before-start-start-is-start",
),
pytest.param(
EpochValueConcurrentStreamStateConverter(),
1617030403,
{"created_at": 1617030404},
datetime(2021, 3, 29, 15, 6, 44, tzinfo=timezone.utc),
id="epoch-converter-state-after-start-start-is-from-state"
id="epoch-converter-state-after-start-start-is-from-state",
),
pytest.param(
IsoMillisConcurrentStreamStateConverter(),
None,
{},
IsoMillisConcurrentStreamStateConverter().zero_value,
id="isomillis-converter-no-state-no-start-start-is-zero-value"
id="isomillis-converter-no-state-no-start-start-is-zero-value",
),
pytest.param(
IsoMillisConcurrentStreamStateConverter(),
"2021-08-22T05:03:27.000Z",
{},
datetime(2021, 8, 22, 5, 3, 27, tzinfo=timezone.utc),
id="isomillis-converter-no-state-with-start-start-is-start"
id="isomillis-converter-no-state-with-start-start-is-start",
),
pytest.param(
IsoMillisConcurrentStreamStateConverter(),
None,
{"created_at": "2021-08-22T05:03:27.000Z"},
datetime(2021, 8, 22, 5, 3, 27, tzinfo=timezone.utc),
id="isomillis-converter-state-without-start-start-is-from-state"
id="isomillis-converter-state-without-start-start-is-from-state",
),
pytest.param(
IsoMillisConcurrentStreamStateConverter(),
"2022-08-22T05:03:27.000Z",
{"created_at": "2021-08-22T05:03:27.000Z"},
datetime(2022, 8, 22, 5, 3, 27, tzinfo=timezone.utc),
id="isomillis-converter-state-before-start-start-is-start"
id="isomillis-converter-state-before-start-start-is-start",
),
pytest.param(
IsoMillisConcurrentStreamStateConverter(),
"2022-08-22T05:03:27.000Z",
{"created_at": "2023-08-22T05:03:27.000Z"},
datetime(2023, 8, 22, 5, 3, 27, tzinfo=timezone.utc),
id="isomillis-converter-state-after-start-start-is-from-state"
id="isomillis-converter-state-after-start-start-is-from-state",
),
]
],
)
def test_get_sync_start(converter, start, state, expected_start):
assert converter._get_sync_start(CursorField("created_at"), state, start) == expected_start
@@ -174,8 +174,12 @@ def test_get_sync_start(converter, start, state, expected_start):
{},
{
"legacy": {},
"slices": [{"start": EpochValueConcurrentStreamStateConverter().zero_value,
"end": EpochValueConcurrentStreamStateConverter().zero_value}],
"slices": [
{
"start": EpochValueConcurrentStreamStateConverter().zero_value,
"end": EpochValueConcurrentStreamStateConverter().zero_value,
}
],
"state_type": "date-range",
},
id="empty-input-state-epoch",
@@ -186,8 +190,12 @@ def test_get_sync_start(converter, start, state, expected_start):
{"created": 1617030403},
{
"state_type": "date-range",
"slices": [{"start": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc)}],
"slices": [
{
"start": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
}
],
"legacy": {"created": 1617030403},
},
id="with-input-state-epoch",
@@ -198,8 +206,12 @@ def test_get_sync_start(converter, start, state, expected_start):
{"created": "2021-08-22T05:03:27.000Z"},
{
"state_type": "date-range",
"slices": [{"start": datetime(2021, 8, 22, 5, 3, 27, tzinfo=timezone.utc),
"end": datetime(2021, 8, 22, 5, 3, 27, tzinfo=timezone.utc)}],
"slices": [
{
"start": datetime(2021, 8, 22, 5, 3, 27, tzinfo=timezone.utc),
"end": datetime(2021, 8, 22, 5, 3, 27, tzinfo=timezone.utc),
}
],
"legacy": {"created": "2021-08-22T05:03:27.000Z"},
},
id="with-input-state-isomillis",
@@ -227,8 +239,12 @@ def test_convert_from_sequential_state(converter, start, sequential_state, expec
EpochValueConcurrentStreamStateConverter(),
{
"state_type": "date-range",
"slices": [{"start": datetime(1970, 1, 3, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc)}],
"slices": [
{
"start": datetime(1970, 1, 3, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
}
],
},
{"created": 1617030403},
id="epoch-single-slice",
@@ -237,10 +253,16 @@ def test_convert_from_sequential_state(converter, start, sequential_state, expec
EpochValueConcurrentStreamStateConverter(),
{
"state_type": "date-range",
"slices": [{"start": datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc)},
{"start": datetime(2020, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2022, 3, 29, 15, 6, 43, tzinfo=timezone.utc)}],
"slices": [
{
"start": datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
},
{
"start": datetime(2020, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2022, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
},
],
},
{"created": 1648566403},
id="epoch-overlapping-slices",
@@ -249,10 +271,16 @@ def test_convert_from_sequential_state(converter, start, sequential_state, expec
EpochValueConcurrentStreamStateConverter(),
{
"state_type": "date-range",
"slices": [{"start": datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc)},
{"start": datetime(2022, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2023, 3, 29, 15, 6, 43, tzinfo=timezone.utc)}],
"slices": [
{
"start": datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
},
{
"start": datetime(2022, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2023, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
},
],
},
{"created": 1617030403},
id="epoch-multiple-slices",
@@ -261,8 +289,12 @@ def test_convert_from_sequential_state(converter, start, sequential_state, expec
IsoMillisConcurrentStreamStateConverter(),
{
"state_type": "date-range",
"slices": [{"start": datetime(1970, 1, 3, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc)}],
"slices": [
{
"start": datetime(1970, 1, 3, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
}
],
},
{"created": "2021-03-29T15:06:43.000Z"},
id="isomillis-single-slice",
@@ -271,10 +303,16 @@ def test_convert_from_sequential_state(converter, start, sequential_state, expec
IsoMillisConcurrentStreamStateConverter(),
{
"state_type": "date-range",
"slices": [{"start": datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc)},
{"start": datetime(2020, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2022, 3, 29, 15, 6, 43, tzinfo=timezone.utc)}],
"slices": [
{
"start": datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
},
{
"start": datetime(2020, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2022, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
},
],
},
{"created": "2022-03-29T15:06:43.000Z"},
id="isomillis-overlapping-slices",
@@ -283,10 +321,16 @@ def test_convert_from_sequential_state(converter, start, sequential_state, expec
IsoMillisConcurrentStreamStateConverter(),
{
"state_type": "date-range",
"slices": [{"start": datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc)},
{"start": datetime(2022, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2023, 3, 29, 15, 6, 43, tzinfo=timezone.utc)}],
"slices": [
{
"start": datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2021, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
},
{
"start": datetime(2022, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
"end": datetime(2023, 3, 29, 15, 6, 43, tzinfo=timezone.utc),
},
],
},
{"created": "2021-03-29T15:06:43.000Z"},
id="isomillis-multiple-slices",

View File

@@ -71,6 +71,7 @@ class PartitionEnqueuerTest(unittest.TestCase):
for partition in partitions:
yield partition
raise exception
return inner_function
@staticmethod

View File

@@ -63,4 +63,5 @@ class PartitionReaderTest(unittest.TestCase):
def mocked_function() -> Iterable[Record]:
yield from records
raise exception
return mocked_function

View File

@@ -372,17 +372,19 @@ def _as_state(state_data: Dict[str, Any], stream_name: str = "", per_stream_stat
return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state_data))
def _as_error_trace(stream: str, error_message: str, internal_message: Optional[str], failure_type: Optional[FailureType], stack_trace: Optional[str]) -> AirbyteMessage:
def _as_error_trace(
stream: str, error_message: str, internal_message: Optional[str], failure_type: Optional[FailureType], stack_trace: Optional[str]
) -> AirbyteMessage:
trace_message = AirbyteTraceMessage(
emitted_at=datetime.datetime.now().timestamp() * 1000.0,
type=TraceType.ERROR,
error=AirbyteErrorTraceMessage(
stream_descriptor=StreamDescriptor(name=stream),
message=error_message,
internal_message=internal_message,
failure_type=failure_type,
stack_trace=stack_trace,
),
stream_descriptor=StreamDescriptor(name=stream),
message=error_message,
internal_message=internal_message,
failure_type=failure_type,
stack_trace=stack_trace,
),
)
return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
@@ -1186,8 +1188,12 @@ def test_checkpoint_state_from_stream_instance():
managers_stream = StreamNoStateMethod()
state_manager = ConnectorStateManager(
{
"teams": AirbyteStream(name="teams", namespace="", json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental]),
"managers": AirbyteStream(name="managers", namespace="", json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental])
"teams": AirbyteStream(
name="teams", namespace="", json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental]
),
"managers": AirbyteStream(
name="managers", namespace="", json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental]
),
},
[],
)
@@ -1207,9 +1213,19 @@ def test_checkpoint_state_from_stream_instance():
@pytest.mark.parametrize(
"exception_to_raise,expected_error_message,expected_internal_message",
[
pytest.param(AirbyteTracedException(message="I was born only to crash like Icarus"), "I was born only to crash like Icarus", None, id="test_raises_traced_exception"),
pytest.param(Exception("Generic connector error message"), "Something went wrong in the connector. See the logs for more details.", "Generic connector error message", id="test_raises_generic_exception"),
]
pytest.param(
AirbyteTracedException(message="I was born only to crash like Icarus"),
"I was born only to crash like Icarus",
None,
id="test_raises_traced_exception",
),
pytest.param(
Exception("Generic connector error message"),
"Something went wrong in the connector. See the logs for more details.",
"Generic connector error message",
id="test_raises_generic_exception",
),
],
)
def test_continue_sync_with_failed_streams(mocker, exception_to_raise, expected_error_message, expected_internal_message):
"""
@@ -1317,7 +1333,9 @@ def test_sync_error_trace_messages_obfuscate_secrets(mocker):
stream_output = [{"k1": "v1"}, {"k2": "v2"}]
s1 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s1")
s2 = StreamRaisesException(exception_to_raise=AirbyteTracedException(message="My api_key value API_KEY_VALUE flew too close to the sun."))
s2 = StreamRaisesException(
exception_to_raise=AirbyteTracedException(message="My api_key value API_KEY_VALUE flew too close to the sun.")
)
s3 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s3")
mocker.patch.object(MockStream, "get_json_schema", return_value={})

View File

@@ -158,7 +158,10 @@ from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager, H
def test_initialize_state_manager(input_stream_state, expected_stream_state, expected_error):
stream_to_instance_map = {
"actors": AirbyteStream(
name="actors", namespace="public", json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
name="actors",
namespace="public",
json_schema={},
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
)
}
@@ -268,7 +271,8 @@ def test_initialize_state_manager(input_stream_state, expected_stream_state, exp
],
)
def test_get_stream_state(input_state, stream_name, namespace, expected_state):
stream_to_instance_map = {stream_name: AirbyteStream(
stream_to_instance_map = {
stream_name: AirbyteStream(
name=stream_name, namespace=namespace, json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental]
)
}

View File

@@ -29,10 +29,7 @@ from airbyte_protocol.models import (
def _a_state_message(state: Any) -> AirbyteMessage:
return AirbyteMessage(
type=Type.STATE,
state=AirbyteStateMessage(data=state)
)
return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state))
def _a_status_message(stream_name: str, status: AirbyteStreamStatus) -> AirbyteMessage:
@@ -50,14 +47,10 @@ def _a_status_message(stream_name: str, status: AirbyteStreamStatus) -> AirbyteM
_A_RECORD = AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream="stream", data={"record key": "record value"}, emitted_at=0)
type=Type.RECORD, record=AirbyteRecordMessage(stream="stream", data={"record key": "record value"}, emitted_at=0)
)
_A_STATE_MESSAGE = _a_state_message({"state key": "state value for _A_STATE_MESSAGE"})
_A_LOG = AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(level=Level.INFO, message="This is an Airbyte log message")
)
_A_LOG = AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="This is an Airbyte log message"))
_AN_ERROR_MESSAGE = AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
@@ -121,6 +114,7 @@ def _create_tmp_file_validation(entrypoint, expected_config, expected_catalog, e
_validate_tmp_catalog(expected_catalog, entrypoint.return_value.parse_args.call_args.args[0][4])
_validate_tmp_json_file(expected_state, entrypoint.return_value.parse_args.call_args.args[0][6])
return entrypoint.return_value.run.return_value
return _validate_tmp_files
@@ -154,6 +148,7 @@ class EntrypointWrapperTest(TestCase):
def _do_some_logging(self):
logging.getLogger("any logger").info(_A_LOG_MESSAGE)
return entrypoint.return_value.run.return_value
entrypoint.return_value.run.side_effect = _do_some_logging
output = read(self._a_source, _A_CONFIG, _A_CATALOG, _A_STATE)
@@ -204,7 +199,7 @@ class EntrypointWrapperTest(TestCase):
def test_given_stream_statuses_when_read_then_return_statuses(self, entrypoint):
status_messages = [
_a_status_message(_A_STREAM_NAME, AirbyteStreamStatus.STARTED),
_a_status_message(_A_STREAM_NAME, AirbyteStreamStatus.COMPLETE)
_a_status_message(_A_STREAM_NAME, AirbyteStreamStatus.COMPLETE),
]
entrypoint.return_value.run.return_value = _to_entrypoint_output(status_messages)
output = read(self._a_source, _A_CONFIG, _A_CATALOG, _A_STATE)
@@ -215,20 +210,20 @@ class EntrypointWrapperTest(TestCase):
status_messages = [
_a_status_message(_A_STREAM_NAME, AirbyteStreamStatus.STARTED),
_a_status_message("another stream name", AirbyteStreamStatus.INCOMPLETE),
_a_status_message(_A_STREAM_NAME, AirbyteStreamStatus.COMPLETE)
_a_status_message(_A_STREAM_NAME, AirbyteStreamStatus.COMPLETE),
]
entrypoint.return_value.run.return_value = _to_entrypoint_output(status_messages)
output = read(self._a_source, _A_CONFIG, _A_CATALOG, _A_STATE)
assert len(output.get_stream_statuses(_A_STREAM_NAME)) == 2
@patch('airbyte_cdk.test.entrypoint_wrapper.print', create=True)
@patch("airbyte_cdk.test.entrypoint_wrapper.print", create=True)
@patch("airbyte_cdk.test.entrypoint_wrapper.AirbyteEntrypoint")
def test_given_unexpected_exception_when_read_then_print(self, entrypoint, print_mock):
entrypoint.return_value.run.side_effect = ValueError("This error should be printed")
read(self._a_source, _A_CONFIG, _A_CATALOG, _A_STATE)
assert print_mock.call_count > 0
@patch('airbyte_cdk.test.entrypoint_wrapper.print', create=True)
@patch("airbyte_cdk.test.entrypoint_wrapper.print", create=True)
@patch("airbyte_cdk.test.entrypoint_wrapper.AirbyteEntrypoint")
def test_given_expected_exception_when_read_then_do_not_print(self, entrypoint, print_mock):
entrypoint.return_value.run.side_effect = ValueError("This error should be printed")

View File

@@ -247,7 +247,9 @@ def test_run_read(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock)
assert spec_mock.called
def test_given_message_emitted_during_config_when_read_then_emit_message_before_next_steps(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
def test_given_message_emitted_during_config_when_read_then_emit_message_before_next_steps(
entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
):
parsed_args = Namespace(command="read", config="config_path", state="statepath", catalog="catalogpath")
mocker.patch.object(MockSource, "read_catalog", side_effect=ValueError)