1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🎉Source Salesforce: add checkpointing (#24888)

* Source Salesforce: add checkpointing

* Source-Iterable: fix integration tests

* Source Salesforce: fix integration test s;ices

* Source Salesforce: wait for latest record to be accessible

* Source Salesforce: retry for 10 times for everything

* Source Salesforce: refactoring. Add checkpointing for all incremental

* Source Salesforce: small fixes

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Roman Yermilov [GL]
2023-04-24 15:26:43 +04:00
committed by GitHub
parent 8c4db0bb29
commit dd607dcdf0
13 changed files with 186 additions and 77 deletions

View File

@@ -31,7 +31,8 @@ def test_bulk_sync_creation_failed(stream_config, stream_api):
with requests_mock.Mocker() as m:
m.register_uri("POST", stream.path(), status_code=400, json=[{"message": "test_error"}])
with pytest.raises(HTTPError) as err:
next(stream.read_records(sync_mode=SyncMode.full_refresh))
stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))
assert err.value.response.json()[0]["message"] == "test_error"
@@ -56,7 +57,8 @@ def test_bulk_stream_fallback_to_rest(mocker, requests_mock, stream_config, stre
# mock REST API
mocker.patch("source_salesforce.source.RestSalesforceStream.read_records", lambda *args, **kwargs: iter(rest_stream_records))
assert type(stream) is BulkIncrementalSalesforceStream
assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == rest_stream_records
stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
assert list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)) == rest_stream_records
def test_stream_unsupported_by_bulk(stream_config, stream_api, caplog):
@@ -97,7 +99,8 @@ def test_bulk_sync_pagination(item_number, stream_config, stream_api):
m.register_uri("DELETE", stream.path() + f"/{job_id}")
m.register_uri("POST", stream.path(), creation_responses)
loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh)]
stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)]
assert not set(test_ids).symmetric_difference(set(loaded_ids))
post_request_count = len([r for r in m.request_history if r.method == "POST"])
assert post_request_count == len(pages)
@@ -113,7 +116,8 @@ def _prepare_mock(m, stream):
def _get_result_id(stream):
return int(list(stream.read_records(sync_mode=SyncMode.full_refresh))[0]["ID"])
stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
return int(list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))[0]["ID"])
def test_bulk_sync_successful(stream_config, stream_api):
@@ -165,7 +169,8 @@ def test_bulk_sync_failed_retry(stream_config, stream_api):
job_id = _prepare_mock(m, stream)
m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "InProgress", "id": job_id})
with pytest.raises(Exception) as err:
next(stream.read_records(sync_mode=SyncMode.full_refresh))
stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))
assert "stream using BULK API was failed" in str(err.value)
@@ -300,7 +305,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state):
m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"})
resp = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-0{i},{i}" for i in range(1, 7)] # 6 records per page
resp = ["Field1,LastModifiedDate,Id"] + [f"test,2021-11-0{i},{i}" for i in range(1, 7)] # 6 records per page
if page == 1:
# Read the first page successfully
@@ -561,8 +566,8 @@ def test_convert_to_standard_instance(stream_config, stream_api):
def test_bulk_stream_paging(stream_config, stream_api_pk):
last_modified_date1 = "2022-10-01T00:00:00Z"
last_modified_date2 = "2022-10-02T00:00:00Z"
last_modified_date1 = "2022-10-01T00:00:00.000+00:00"
last_modified_date2 = "2022-10-02T00:00:00.000+00:00"
assert last_modified_date1 < last_modified_date2
stream_config["start_date"] = last_modified_date1
@@ -571,10 +576,10 @@ def test_bulk_stream_paging(stream_config, stream_api_pk):
csv_header = "Field1,LastModifiedDate,Id"
pages = [
[f"test,{last_modified_date1},1", f"test,{last_modified_date1},3"],
[f"test,{last_modified_date1},5", f"test,{last_modified_date2},2"],
[f"test,{last_modified_date2},2", f"test,{last_modified_date2},4"],
[f"test,{last_modified_date2},6"],
[f"test,{last_modified_date1},1", f"test,{last_modified_date1},2"],
[f"test,{last_modified_date1},3", f"test,{last_modified_date2},4"],
[f"test,{last_modified_date2},5", f"test,{last_modified_date2},6"],
[f"test,{last_modified_date2},7"],
]
with requests_mock.Mocker() as mocked_requests:
@@ -587,32 +592,34 @@ def test_bulk_stream_paging(stream_config, stream_api_pk):
mocked_requests.register_uri("DELETE", stream.path() + f"/{job_id}")
mocked_requests.register_uri("POST", stream.path(), post_responses)
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
records = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))
assert records == [
{"Field1": "test", "Id": 1, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 3, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 5, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date2},
{"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date2}, # duplicate record
{"Field1": "test", "Id": 4, "LastModifiedDate": last_modified_date2},
{"Field1": "test", "Id": 5, "LastModifiedDate": last_modified_date2}, # duplicate record
{"Field1": "test", "Id": 6, "LastModifiedDate": last_modified_date2},
{"Field1": "test", "Id": 7, "LastModifiedDate": last_modified_date2},
]
def get_query(request_index):
return mocked_requests.request_history[request_index].json()["query"]
SELECT = "SELECT LastModifiedDate,Id FROM Account"
ORDER_BY = "ORDER BY LastModifiedDate,Id ASC LIMIT 2"
SELECT = "SELECT LastModifiedDate, Id FROM Account"
ORDER_BY = "ORDER BY LastModifiedDate, Id ASC LIMIT 2"
assert get_query(0) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} {ORDER_BY}"
assert get_query(0) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} {ORDER_BY}"
q = f"{SELECT} WHERE (LastModifiedDate = {last_modified_date1} AND Id > '3') OR (LastModifiedDate > {last_modified_date1}) {ORDER_BY}"
q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '2' {ORDER_BY}"
assert get_query(4) == q
assert get_query(8) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date2} {ORDER_BY}"
q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '4' {ORDER_BY}"
assert get_query(8) == q
q = f"{SELECT} WHERE (LastModifiedDate = {last_modified_date2} AND Id > '4') OR (LastModifiedDate > {last_modified_date2}) {ORDER_BY}"
q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '6' {ORDER_BY}"
assert get_query(12) == q

View File

@@ -81,7 +81,7 @@ def _stream_api(stream_config, describe_response_data=None):
sf_object.access_token = Mock()
sf_object.instance_url = "https://fase-account.salesforce.com"
response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}]}
response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}, {"name": "Id", "type": "string"}]}
if describe_response_data:
response_data = describe_response_data
sf_object.describe = Mock(return_value=response_data)