🎉 Python CDK: Allow setting network adapter args on outgoing HTTP requests (#4493)
This commit is contained in:
2
.github/workflows/publish-cdk-command.yml
vendored
2
.github/workflows/publish-cdk-command.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
- name: Checkout Airbyte
|
||||
uses: actions/checkout@v2
|
||||
- name: Build CDK Package
|
||||
run: ./gradlew --no-daemon :airbyte-cdk:python:build
|
||||
run: ./gradlew --no-daemon --no-build-cache :airbyte-cdk:python:build
|
||||
- name: Add Failure Comment
|
||||
if: github.event.inputs.comment-id && !success()
|
||||
uses: peter-evans/create-or-update-comment@v1
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
# Changelog
|
||||
|
||||
## 0.1.5
|
||||
Allow specifying keyword arguments to be sent on a request made by an HTTP stream: https://github.com/airbytehq/airbyte/pull/4493
|
||||
|
||||
## 0.1.4
|
||||
Allow to use Python 3.7.0: https://github.com/airbytehq/airbyte/pull/3566
|
||||
|
||||
|
||||
@@ -118,6 +118,19 @@ class HttpStream(Stream, ABC):
|
||||
"""
|
||||
return None
|
||||
|
||||
def request_kwargs(
|
||||
self,
|
||||
stream_state: Mapping[str, Any],
|
||||
stream_slice: Mapping[str, Any] = None,
|
||||
next_page_token: Mapping[str, Any] = None,
|
||||
) -> Mapping[str, Any]:
|
||||
"""
|
||||
Override to return a mapping of keyword arguments to be used when creating the HTTP request.
|
||||
Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
|
||||
this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
|
||||
"""
|
||||
return {}
|
||||
|
||||
@abstractmethod
|
||||
def parse_response(
|
||||
self,
|
||||
@@ -166,13 +179,13 @@ class HttpStream(Stream, ABC):
|
||||
# TODO support non-json bodies
|
||||
args["json"] = json
|
||||
|
||||
return requests.Request(**args).prepare()
|
||||
return self._session.prepare_request(requests.Request(**args))
|
||||
|
||||
# TODO allow configuring these parameters. If we can get this into the requests library, then we can do it without the ugly exception hacks
|
||||
# see https://github.com/litl/backoff/pull/122
|
||||
@default_backoff_handler(max_tries=5, factor=5)
|
||||
@user_defined_backoff_handler(max_tries=5)
|
||||
def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
|
||||
def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
|
||||
"""
|
||||
Wraps sending the request in rate limit and error handlers.
|
||||
|
||||
@@ -190,9 +203,8 @@ class HttpStream(Stream, ABC):
|
||||
Unexpected transient exceptions use the default backoff parameters.
|
||||
Unexpected persistent exceptions are not handled and will cause the sync to fail.
|
||||
"""
|
||||
response: requests.Response = self._session.send(request)
|
||||
response: requests.Response = self._session.send(request, **request_kwargs)
|
||||
if self.should_retry(response):
|
||||
|
||||
custom_backoff_time = self.backoff_time(response)
|
||||
if custom_backoff_time:
|
||||
raise UserDefinedBackoffException(backoff=custom_backoff_time, request=request, response=response)
|
||||
@@ -224,8 +236,8 @@ class HttpStream(Stream, ABC):
|
||||
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
|
||||
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
|
||||
)
|
||||
|
||||
response = self._send_request(request)
|
||||
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
|
||||
response = self._send_request(request, request_kwargs)
|
||||
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
|
||||
|
||||
next_page_token = self.next_page_token(response)
|
||||
|
||||
@@ -71,3 +71,8 @@ errors. It is not currently possible to specify a rate limit Airbyte should adhe
|
||||
### Stream Slicing
|
||||
|
||||
When implementing [stream slicing](incremental-stream.md#streamstream_slices) in an `HTTPStream` each Slice is equivalent to a HTTP request; the stream will make one request per element returned by the `stream_slices` function. The current slice being read is passed into every other method in `HttpStream` e.g: `request_params`, `request_headers`, `path`, etc.. to be injected into a request. This allows you to dynamically determine the output of the `request_params`, `path`, and other functions to read the input slice and return the appropriate value.
|
||||
|
||||
### Network Adapter Keyword arguments
|
||||
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
|
||||
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
|
||||
be returned as a keyword argument.
|
||||
|
||||
@@ -35,7 +35,7 @@ README = (HERE / "README.md").read_text()
|
||||
|
||||
setup(
|
||||
name="airbyte-cdk",
|
||||
version="0.1.4",
|
||||
version="0.1.5",
|
||||
description="A framework for writing Airbyte Connectors.",
|
||||
long_description=README,
|
||||
long_description_content_type="text/markdown",
|
||||
@@ -73,14 +73,7 @@ setup(
|
||||
"requests",
|
||||
],
|
||||
python_requires=">=3.7.0",
|
||||
extras_require={
|
||||
"dev": [
|
||||
"MyPy==0.812",
|
||||
"pytest",
|
||||
"pytest-cov",
|
||||
"pytest-mock",
|
||||
]
|
||||
},
|
||||
extras_require={"dev": ["MyPy==0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"]},
|
||||
entry_points={
|
||||
"console_scripts": ["base-python=base_python.entrypoint:main"],
|
||||
},
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
|
||||
|
||||
from typing import Any, Iterable, Mapping, Optional
|
||||
from unittest.mock import ANY
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -60,6 +61,18 @@ class StubBasicReadHttpStream(HttpStream):
|
||||
yield stubResp
|
||||
|
||||
|
||||
def test_request_kwargs_used(mocker, requests_mock):
|
||||
stream = StubBasicReadHttpStream()
|
||||
request_kwargs = {"cert": None, "proxies": "google.com"}
|
||||
mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs)
|
||||
mocker.patch.object(stream._session, "send", wraps=stream._session.send)
|
||||
requests_mock.register_uri("GET", stream.url_base)
|
||||
|
||||
list(stream.read_records(sync_mode=SyncMode.full_refresh))
|
||||
|
||||
stream._session.send.assert_any_call(ANY, **request_kwargs)
|
||||
|
||||
|
||||
def test_stub_basic_read_http_stream_read_records(mocker):
|
||||
stream = StubBasicReadHttpStream()
|
||||
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.
|
||||
|
||||
Reference in New Issue
Block a user