feat(airbyte-cdk) Async jobs - Limit memory usage (#46286)
This commit is contained in:
committed by
GitHub
parent
9dc9442cd6
commit
b0cac50b9e
@@ -15,7 +15,7 @@ from numpy import nan
|
||||
|
||||
EMPTY_STR: str = ""
|
||||
DEFAULT_ENCODING: str = "utf-8"
|
||||
DOWNLOAD_CHUNK_SIZE: int = 1024 * 1024 * 10
|
||||
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10
|
||||
|
||||
|
||||
class ResponseToFileExtractor(RecordExtractor):
|
||||
|
||||
@@ -13,3 +13,21 @@ def mock_sleep(monkeypatch):
|
||||
with freezegun.freeze_time(datetime.datetime.now(), ignore=["_pytest.runner", "_pytest.terminal"]) as frozen_datetime:
|
||||
monkeypatch.setattr("time.sleep", lambda x: frozen_datetime.tick(x))
|
||||
yield
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption(
|
||||
"--skipslow", action="store_true", default=False, help="skip slow tests"
|
||||
)
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
config.addinivalue_line("markers", "slow: mark test as slow to run")
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(config, items):
|
||||
if config.getoption("--skipslow"):
|
||||
skip_slow = pytest.mark.skip(reason="--skipslow option has been provided and this test is marked as slow")
|
||||
for item in items:
|
||||
if "slow" in item.keywords:
|
||||
item.add_marker(skip_slow)
|
||||
|
||||
@@ -52,6 +52,7 @@ def large_event_response_fixture():
|
||||
os.remove(file_path)
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.limit_memory("20 MB")
|
||||
def test_jsonl_decoder_memory_usage(requests_mock, large_events_response):
|
||||
lines_in_response, file_path = large_events_response
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
import csv
|
||||
import os
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from unittest import TestCase
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
import requests_mock
|
||||
from airbyte_cdk.sources.declarative.extractors import ResponseToFileExtractor
|
||||
@@ -52,3 +55,33 @@ class ResponseToFileExtractorTest(TestCase):
|
||||
any_url = "https://anyurl.com"
|
||||
self._http_mocker.register_uri("GET", any_url, [{"body": io, "status_code": 200}])
|
||||
return requests.get(any_url)
|
||||
|
||||
|
||||
@pytest.fixture(name="large_events_response")
|
||||
def large_event_response_fixture():
|
||||
lines_in_response = 2_000_000 # ≈ 62 MB of response
|
||||
dir_path = os.path.dirname(os.path.realpath(__file__))
|
||||
file_path = f"{dir_path}/test_response.csv"
|
||||
with open(file_path, "w") as csvfile:
|
||||
csv_writer = csv.writer(csvfile)
|
||||
csv_writer.writerow(["username", "email"]) # headers
|
||||
for _ in range(lines_in_response):
|
||||
csv_writer.writerow(["a_username","email1@example.com"])
|
||||
yield (lines_in_response, file_path)
|
||||
os.remove(file_path)
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.limit_memory("20 MB")
|
||||
def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response):
|
||||
lines_in_response, file_path = large_events_response
|
||||
extractor = ResponseToFileExtractor()
|
||||
|
||||
url = "https://for-all-mankind.nasa.com/api/v1/users/users1"
|
||||
requests_mock.get(url, body=open(file_path, "rb"))
|
||||
|
||||
counter = 0
|
||||
for _ in extractor.extract_records(requests.get(url, stream=True)):
|
||||
counter += 1
|
||||
|
||||
assert counter == lines_in_response
|
||||
|
||||
Reference in New Issue
Block a user