1
0
mirror of synced 2026-01-07 18:06:03 -05:00

connector builder: Emit message at start of slice (#25180)

* Move condition for yielding the slice message to an overwritable method

* Automated Commit - Formatting Changes

* yield the slice log messages

* same for incremental

* refactor

* Revert "refactor"

This reverts commit c594365bd8.

* move flag from factory to source

* set the flag

* remove debug print

* halfmock

* clean up

* Add a test for a single page

* Add another test

* Pass the flag

* rename

---------

Co-authored-by: girarda <girarda@users.noreply.github.com>
This commit is contained in:
Alexandre Girard
2023-04-14 10:23:59 -07:00
committed by GitHub
parent 6f064c04a0
commit f3799280f2
4 changed files with 173 additions and 28 deletions

View File

@@ -9,6 +9,7 @@ from unittest import mock
from unittest.mock import patch
import pytest
import requests
from airbyte_cdk import connector_builder
from airbyte_cdk.connector_builder.connector_builder_handler import (
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE,
@@ -22,10 +23,22 @@ from airbyte_cdk.connector_builder.connector_builder_handler import (
)
from airbyte_cdk.connector_builder.main import handle_connector_builder_request, handle_request, read_stream
from airbyte_cdk.connector_builder.models import LogMessage, StreamRead, StreamReadSlicesInner, StreamReadSlicesInnerPagesInner
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import (
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
Level,
SyncMode,
)
from airbyte_cdk.models import Type
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.retrievers import SimpleRetrieverTestReadDecorator
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from unit_tests.connector_builder.utils import create_configured_catalog
@@ -34,23 +47,29 @@ _stream_name = "stream_with_custom_requester"
_stream_primary_key = "id"
_stream_url_base = "https://api.sendgrid.com"
_stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base}
_page_size = 2
MANIFEST = {
"version": "0.30.3",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size": _page_size,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path", "type": "RequestPath"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size},
},
"partition_router": {
"type": "ListPartitionRouter",
"values": ["0", "1", "2", "3", "4", "5", "6", "7"],
"cursor_field": "item_id"
},
""
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": "10"},
"request_parameters": {"a_param": "10"},
},
"record_selector": {"extractor": {"field_path": ["result"]}},
},
@@ -59,7 +78,6 @@ MANIFEST = {
{
"type": "DeclarativeStream",
"$parameters": _stream_options,
"schema_loader": {"$ref": "#/definitions/schema_loader"},
"retriever": "#/definitions/retriever",
},
],
@@ -169,19 +187,23 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
"type": "DeclarativeSource",
"version": "0.30.3",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size": _page_size,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path", "type": "RequestPath"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size},
},
"partition_router": {
"type": "ListPartitionRouter",
"values": ["0", "1", "2", "3", "4", "5", "6", "7"],
"cursor_field": "item_id",
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": "10"},
"request_parameters": {"a_param": "10"},
},
"record_selector": {"extractor": {"field_path": ["result"]}},
},
@@ -189,19 +211,11 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
"streams": [
{
"type": "DeclarativeStream",
"schema_loader": {
"type": "JsonFileSchemaLoader",
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"retriever": {
"type": "SimpleRetriever",
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size": _page_size,
"page_size_option": {
"type": "RequestOption",
"inject_into": "request_parameter",
@@ -226,7 +240,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
"page_size": 10,
"page_size": _page_size,
},
"name": _stream_name,
"primary_key": _stream_primary_key,
@@ -244,7 +258,16 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"request_parameters": {"page_size": "10"},
"request_parameters": {"a_param": "10"},
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"partition_router": {
"type": "ListPartitionRouter",
"values": ["0", "1", "2", "3", "4", "5", "6", "7"],
"cursor_field": "item_id",
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
@@ -512,3 +535,101 @@ def test_create_source():
assert isinstance(source, ManifestDeclarativeSource)
assert source._constructor._limit_pages_fetched_per_slice == limits.max_pages_per_slice
assert source._constructor._limit_slices_fetched == limits.max_slices
def request_log_message(request: dict) -> AirbyteMessage:
return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"request:{json.dumps(request)}"))
def response_log_message(response: dict) -> AirbyteMessage:
return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"response:{json.dumps(response)}"))
def _create_request():
url = "https://example.com/api"
headers = {'Content-Type': 'application/json'}
return requests.Request('POST', url, headers=headers, json={"key": "value"}).prepare()
def _create_response(body):
response = requests.Response()
response.status_code = 200
response._content = bytes(json.dumps(body), "utf-8")
response.headers["Content-Type"] = "application/json"
return response
def _create_page(response_body):
return _create_request(), _create_response(response_body)
@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}})) * 10)
def test_read_source(mock_http_stream):
"""
This test sort of acts as an integration test for the connector builder.
Each slice has two pages
The first page has two records
The second page one record
The response._metadata.next field in the first page tells the paginator to fetch the next page.
"""
max_records = 100
max_pages_per_slice = 2
max_slices = 3
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)
catalog = ConfiguredAirbyteCatalog(streams=[
ConfiguredAirbyteStream(stream=AirbyteStream(name=_stream_name, json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append)
])
config = {"__injected_declarative_manifest": MANIFEST}
source = create_source(config, limits)
output_data = read_stream(source, config, catalog, limits).record.data
slices = output_data["slices"]
assert len(slices) == max_slices
for s in slices:
pages = s["pages"]
assert len(pages) == max_pages_per_slice
first_page, second_page = pages[0], pages[1]
assert len(first_page["records"]) == _page_size
assert len(second_page["records"]) == 1
streams = source.streams(config)
for s in streams:
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)
@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}})))
def test_read_source_single_page_single_slice(mock_http_stream):
max_records = 100
max_pages_per_slice = 1
max_slices = 1
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)
catalog = ConfiguredAirbyteCatalog(streams=[
ConfiguredAirbyteStream(stream=AirbyteStream(name=_stream_name, json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append)
])
config = {"__injected_declarative_manifest": MANIFEST}
source = create_source(config, limits)
output_data = read_stream(source, config, catalog, limits).record.data
slices = output_data["slices"]
assert len(slices) == max_slices
for s in slices:
pages = s["pages"]
assert len(pages) == max_pages_per_slice
first_page = pages[0]
assert len(first_page["records"]) == _page_size
streams = source.streams(config)
for s in streams:
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)