🎉 CDK: Added support for efficient parent/child streams using cache (#6057)
* Add caching * Upd cache file handling * Upd slices, sync mode, docs * Bump version * Use SyncMode.full_refresh for parent stream_slices * Refactor
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
# Changelog
|
||||
|
||||
## 0.1.23
|
||||
Added the ability to use caching for efficient synchronization of nested streams.
|
||||
|
||||
## 0.1.22
|
||||
Allow passing custom headers to request in `OAuth2Authenticator.refresh_access_token()`: https://github.com/airbytehq/airbyte/pull/6219
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Initialize Streams Package
|
||||
from .exceptions import UserDefinedBackoffException
|
||||
from .http import HttpStream
|
||||
from .http import HttpStream, HttpSubStream
|
||||
|
||||
__all__ = ["HttpStream", "UserDefinedBackoffException"]
|
||||
__all__ = ["HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
|
||||
|
||||
@@ -23,10 +23,14 @@
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
|
||||
|
||||
import requests
|
||||
import vcr
|
||||
import vcr.cassette as Cassette
|
||||
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.streams.core import Stream
|
||||
from requests.auth import AuthBase
|
||||
@@ -57,6 +61,39 @@ class HttpStream(Stream, ABC):
|
||||
elif authenticator:
|
||||
self._authenticator = authenticator
|
||||
|
||||
if self.use_cache:
|
||||
self.cache_file = self.request_cache()
|
||||
# we need this attr to get metadata about cassettes, such as record play count, all records played, etc.
|
||||
self.cassete = None
|
||||
|
||||
@property
|
||||
def cache_filename(self):
|
||||
"""
|
||||
Override if needed. Return the name of cache file
|
||||
"""
|
||||
return f'{self.name}.yml'
|
||||
|
||||
@property
|
||||
def use_cache(self):
|
||||
"""
|
||||
Override if needed. If True, all records will be cached.
|
||||
"""
|
||||
return False
|
||||
|
||||
def request_cache(self) -> Cassette:
|
||||
"""
|
||||
Builds VCR instance.
|
||||
It deletes file everytime we create it, normally should be called only once.
|
||||
We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files.
|
||||
"""
|
||||
|
||||
try:
|
||||
os.remove(self.cache_filename)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
return vcr.use_cassette(self.cache_filename, record_mode="new_episodes", serializer="yaml")
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def url_base(self) -> str:
|
||||
@@ -322,7 +359,18 @@ class HttpStream(Stream, ABC):
|
||||
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
|
||||
)
|
||||
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
|
||||
response = self._send_request(request, request_kwargs)
|
||||
|
||||
if self.use_cache:
|
||||
# use context manager to handle and store cassette metadata
|
||||
with self.cache_file as cass:
|
||||
self.cassete = cass
|
||||
# vcr tries to find records based on the request, if such records exist, return from cache file
|
||||
# else make a request and save record in cache file
|
||||
response = self._send_request(request, request_kwargs)
|
||||
|
||||
else:
|
||||
response = self._send_request(request, request_kwargs)
|
||||
|
||||
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
|
||||
|
||||
next_page_token = self.next_page_token(response)
|
||||
@@ -331,3 +379,35 @@ class HttpStream(Stream, ABC):
|
||||
|
||||
# Always return an empty generator just in case no records were ever yielded
|
||||
yield from []
|
||||
|
||||
|
||||
class HttpSubStream(HttpStream, ABC):
|
||||
|
||||
def __init__(self, parent: HttpStream, **kwargs):
|
||||
"""
|
||||
:param parent: should be the instance of HttpStream class
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.parent = parent
|
||||
|
||||
def stream_slices(
|
||||
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
|
||||
) -> Iterable[Optional[Mapping[str, Any]]]:
|
||||
parent_stream_slices = self.parent.stream_slices(
|
||||
sync_mode=SyncMode.full_refresh,
|
||||
cursor_field=cursor_field,
|
||||
stream_state=stream_state
|
||||
)
|
||||
|
||||
# iterate over all parent stream_slices
|
||||
for stream_slice in parent_stream_slices:
|
||||
parent_records = self.parent.read_records(
|
||||
sync_mode=SyncMode.full_refresh,
|
||||
cursor_field=cursor_field,
|
||||
stream_slice=stream_slice,
|
||||
stream_state=stream_state
|
||||
)
|
||||
|
||||
# iterate over all parent records with current stream_slice
|
||||
for record in parent_records:
|
||||
yield {"parent": record}
|
||||
|
||||
@@ -72,7 +72,12 @@ errors. It is not currently possible to specify a rate limit Airbyte should adhe
|
||||
|
||||
When implementing [stream slicing](incremental-stream.md#streamstream_slices) in an `HTTPStream` each Slice is equivalent to a HTTP request; the stream will make one request per element returned by the `stream_slices` function. The current slice being read is passed into every other method in `HttpStream` e.g: `request_params`, `request_headers`, `path`, etc.. to be injected into a request. This allows you to dynamically determine the output of the `request_params`, `path`, and other functions to read the input slice and return the appropriate value.
|
||||
|
||||
### Caching
|
||||
|
||||
When we are dealing with streams that depend on the results of another stream, we can use caching to write the data of the parent stream to a file in order to use this data when the child stream synchronizes, rather than performing a full HTTP request again. We can turn on caching by overriding use_cache property, and use HttpSubStream class as base class of child stream.
|
||||
|
||||
### Network Adapter Keyword arguments
|
||||
|
||||
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
|
||||
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
|
||||
be returned as a keyword argument.
|
||||
|
||||
@@ -35,7 +35,7 @@ README = (HERE / "README.md").read_text()
|
||||
|
||||
setup(
|
||||
name="airbyte-cdk",
|
||||
version="0.1.22",
|
||||
version="0.1.23",
|
||||
description="A framework for writing Airbyte Connectors.",
|
||||
long_description=README,
|
||||
long_description_content_type="text/markdown",
|
||||
@@ -72,6 +72,7 @@ setup(
|
||||
"pydantic~=1.6",
|
||||
"PyYAML~=5.4",
|
||||
"requests",
|
||||
"vcrpy",
|
||||
"Deprecated~=1.2",
|
||||
],
|
||||
python_requires=">=3.7.0",
|
||||
|
||||
@@ -31,7 +31,7 @@ from unittest.mock import ANY
|
||||
import pytest
|
||||
import requests
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.streams.http import HttpStream
|
||||
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
|
||||
from airbyte_cdk.sources.streams.http.auth import NoAuth
|
||||
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator as HttpTokenAuthenticator
|
||||
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
|
||||
@@ -371,3 +371,81 @@ class TestRequestBody:
|
||||
assert response["body"] == self.data_body
|
||||
else:
|
||||
assert response["body"] is None
|
||||
|
||||
|
||||
class CacheHttpStream(StubBasicReadHttpStream):
|
||||
use_cache = True
|
||||
|
||||
|
||||
class CacheHttpSubStream(HttpSubStream):
|
||||
url_base = "https://example.com"
|
||||
primary_key = ""
|
||||
|
||||
def __init__(self, parent):
|
||||
super().__init__(parent=parent)
|
||||
|
||||
def parse_response(self, **kwargs) -> Iterable[Mapping]:
|
||||
return []
|
||||
|
||||
def next_page_token(self, **kwargs) -> Optional[Mapping[str, Any]]:
|
||||
return None
|
||||
|
||||
def path(self, **kwargs) -> str:
|
||||
return ""
|
||||
|
||||
|
||||
def test_caching_filename():
|
||||
stream = CacheHttpStream()
|
||||
assert stream.cache_filename == f"{stream.name}.yml"
|
||||
|
||||
|
||||
def test_caching_cassettes_are_different():
|
||||
stream_1 = CacheHttpStream()
|
||||
stream_2 = CacheHttpStream()
|
||||
|
||||
assert stream_1.cache_file != stream_2.cache_file
|
||||
|
||||
|
||||
def test_parent_attribute_exist():
|
||||
parent_stream = CacheHttpStream()
|
||||
child_stream = CacheHttpSubStream(parent=parent_stream)
|
||||
|
||||
assert child_stream.parent == parent_stream
|
||||
|
||||
|
||||
def test_cache_response(mocker):
|
||||
stream = CacheHttpStream()
|
||||
mocker.patch.object(stream, "url_base", "https://google.com/")
|
||||
list(stream.read_records(sync_mode=SyncMode.full_refresh))
|
||||
|
||||
with open(stream.cache_filename, 'r') as f:
|
||||
assert f.read()
|
||||
|
||||
|
||||
class CacheHttpStreamWithSlices(CacheHttpStream):
|
||||
paths = ['', 'search']
|
||||
|
||||
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
|
||||
return f'{stream_slice.get("path")}'
|
||||
|
||||
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
|
||||
for path in self.paths:
|
||||
yield {"path": path}
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
yield response
|
||||
|
||||
|
||||
def test_using_cache(mocker):
|
||||
parent_stream = CacheHttpStreamWithSlices()
|
||||
mocker.patch.object(parent_stream, "url_base", "https://google.com/")
|
||||
|
||||
for _slice in parent_stream.stream_slices():
|
||||
list(parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice))
|
||||
|
||||
child_stream = CacheHttpSubStream(parent=parent_stream)
|
||||
|
||||
for _slice in child_stream.stream_slices(sync_mode=SyncMode.full_refresh):
|
||||
pass
|
||||
|
||||
assert parent_stream.cassete.play_count != 0
|
||||
|
||||
Reference in New Issue
Block a user