diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json b/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json
index 908b491e504..71ed5f20d35 100644
--- a/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json
+++ b/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json
@@ -23087,7 +23087,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
- "dockerImageTag": "2.0.9",
+ "dockerImageTag": "2.0.10",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/salesforce",
"icon": "salesforce.svg",
"sourceType": "api",
diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/source_definitions.yaml b/airbyte-config-oss/init-oss/src/main/resources/seed/source_definitions.yaml
index 6c6e7e8ce4b..0a15bca6539 100644
--- a/airbyte-config-oss/init-oss/src/main/resources/seed/source_definitions.yaml
+++ b/airbyte-config-oss/init-oss/src/main/resources/seed/source_definitions.yaml
@@ -1830,7 +1830,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
- dockerImageTag: 2.0.9
+ dockerImageTag: 2.0.10
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/source_specs.yaml b/airbyte-config-oss/init-oss/src/main/resources/seed/source_specs.yaml
index fddc8ed84ec..a1a22153004 100644
--- a/airbyte-config-oss/init-oss/src/main/resources/seed/source_specs.yaml
+++ b/airbyte-config-oss/init-oss/src/main/resources/seed/source_specs.yaml
@@ -13749,7 +13749,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
-- dockerImage: "airbyte/source-salesforce:2.0.9"
+- dockerImage: "airbyte/source-salesforce:2.0.10"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile
index 3e63ba26795..45eac2f9393 100644
--- a/airbyte-integrations/connectors/source-salesforce/Dockerfile
+++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile
@@ -13,5 +13,5 @@ RUN pip install .
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
-LABEL io.airbyte.version=2.0.9
+LABEL io.airbyte.version=2.0.10
LABEL io.airbyte.name=airbyte/source-salesforce
diff --git a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml
index 32935fdc9f3..a16a948ebec 100644
--- a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml
+++ b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml
@@ -33,6 +33,7 @@ acceptance_tests:
bypass_reason: "impossible to fill the stream with data because it is an organic traffic"
- name: "Describe"
bypass_reason: "Data is not permanent"
+ timeout_seconds: 3600
fail_on_extra_columns: false
incremental:
tests:
@@ -40,7 +41,9 @@ acceptance_tests:
configured_catalog_path: "integration_tests/incremental_catalog.json"
future_state:
future_state_path: "integration_tests/future_state.json"
+ timeout_seconds: 7200
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
+ timeout_seconds: 3600
diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py b/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py
index 98533df3cd5..64c16cddca0 100644
--- a/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py
+++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py
@@ -72,7 +72,11 @@ def test_not_queryable_stream(caplog, input_config):
)
def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, stream_name, log_messages):
stream = get_stream(input_sandbox_config, stream_name)
- expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
+ stream_slice = {
+ "start_date": "2023-01-01T00:00:00.000+0000",
+ "end_date": "2023-02-01T00:00:00.000+0000"
+ }
+ expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
create_query_matcher = re.compile(r"jobs/query$")
job_matcher = re.compile(r"jobs/query/fake_id$")
@@ -88,7 +92,7 @@ def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, str
m.register_uri("GET", job_matcher, json={"state": "Failed", "errorMessage": "unknown error"})
m.register_uri("DELETE", job_matcher, json={})
with caplog.at_level(logging.WARNING):
- loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
+ loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
caplog_rec_counter = len(caplog.records) - 1
for log_message in log_messages:
diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py
index d4f06be215f..c0daa413049 100644
--- a/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py
+++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py
@@ -8,6 +8,7 @@ import time
from datetime import datetime
from pathlib import Path
+import pendulum
import pytest
import requests
from airbyte_cdk.models import SyncMode
@@ -66,8 +67,7 @@ def update_note(stream, note_id, headers):
def get_stream_state():
- state_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
- return {"LastModifiedDate": state_date}
+ return {"LastModifiedDate": pendulum.now(tz="UTC").add(days=-1).isoformat(timespec="milliseconds")}
def test_update_for_deleted_record(stream):
@@ -79,21 +79,54 @@ def test_update_for_deleted_record(stream):
created_note_id = response.json()["id"]
- notes = set(record["Id"] for record in stream.read_records(sync_mode=None))
- assert created_note_id in notes, "The stream didn't return the note we created"
+ # A record may not be accessible right after creation. This workaround makes few attempts to receive latest record
+ notes = []
+ attempts = 10
+ while created_note_id not in notes:
+ now = pendulum.now(tz="UTC")
+ stream_slice = {
+ "start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
+ "end_date": now.isoformat(timespec="milliseconds")
+ }
+ notes = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
+ try:
+ assert created_note_id in notes, "The stream didn't return the note we created"
+ break
+ except Exception as e:
+ if attempts:
+ time.sleep(2)
+ else:
+ raise e
+ attempts = attempts - 1
response = delete_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not deleted"
- is_note_updated = False
- is_deleted = False
- for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state):
- if created_note_id == record["Id"]:
- is_note_updated = True
- is_deleted = record["IsDeleted"]
+ # A record may still be accessible right after deletion for some time
+ attempts = 10
+ while True:
+ is_note_updated = False
+ is_deleted = False
+ now = pendulum.now(tz="UTC")
+ stream_slice = {
+ "start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
+ "end_date": now.isoformat(timespec="milliseconds")
+ }
+ for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state, stream_slice=stream_slice):
+ if created_note_id == record["Id"]:
+ is_note_updated = True
+ is_deleted = record["IsDeleted"]
+ break
+ try:
+ assert is_note_updated, "No deleted note during the sync"
+ assert is_deleted, "Wrong field value for deleted note during the sync"
break
- assert is_note_updated, "No deleted note during the sync"
- assert is_deleted, "Wrong field value for deleted note during the sync"
+ except Exception as e:
+ if attempts:
+ time.sleep(2)
+ else:
+ raise e
+ attempts = attempts - 1
time.sleep(1)
response = update_note(stream, created_note_id, headers)
@@ -107,8 +140,25 @@ def test_deleted_record(stream):
created_note_id = response.json()["id"]
- notes = set(record["Id"] for record in stream.read_records(sync_mode=None))
- assert created_note_id in notes, "No created note during the sync"
+ # A record may not be accessible right after creation. This workaround makes few attempts to receive latest record
+ notes = []
+ attempts = 10
+ while created_note_id not in notes:
+ now = pendulum.now(tz="UTC")
+ stream_slice = {
+ "start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
+ "end_date": now.isoformat(timespec="milliseconds")
+ }
+ notes = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
+ try:
+ assert created_note_id in notes, "No created note during the sync"
+ break
+ except Exception as e:
+ if attempts:
+ time.sleep(2)
+ else:
+ raise e
+ attempts = attempts - 1
response = update_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not updated"
@@ -117,14 +167,29 @@ def test_deleted_record(stream):
response = delete_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not deleted"
- record = None
- for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state):
- if created_note_id == record["Id"]:
+ # A record updates take some time to become accessible
+ attempts = 10
+ while created_note_id not in notes:
+ now = pendulum.now(tz="UTC")
+ stream_slice = {
+ "start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
+ "end_date": now.isoformat(timespec="milliseconds")
+ }
+ record = None
+ for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state, stream_slice=stream_slice):
+ if created_note_id == record["Id"]:
+ break
+ try:
+ assert record, "No updated note during the sync"
+ assert record["IsDeleted"], "Wrong field value for deleted note during the sync"
+ assert record["TextPreview"] == UPDATED_NOTE_CONTENT and record["TextPreview"] != NOTE_CONTENT, "Note Content was not updated"
break
-
- assert record, "No updated note during the sync"
- assert record["IsDeleted"], "Wrong field value for deleted note during the sync"
- assert record["TextPreview"] == UPDATED_NOTE_CONTENT and record["TextPreview"] != NOTE_CONTENT, "Note Content was not updated"
+ except Exception as e:
+ if attempts:
+ time.sleep(2)
+ else:
+ raise e
+ attempts = attempts - 1
def test_parallel_discover(input_sandbox_config):
diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py
index fb5444dc8e0..77243ea2b57 100644
--- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py
+++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py
@@ -147,7 +147,8 @@ class SourceSalesforce(AbstractSource):
except exceptions.HTTPError as error:
error_data = error.response.json()[0]
error_code = error_data.get("errorCode")
+ url = error.response.url
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
- logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'")
+ logger.warning(f"API Call {url} limit is exceeded. Error message: '{error_data.get('message')}'")
raise AirbyteStopSync() # if got 403 rate limit response, finish the sync with success.
raise error
diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
index c2d001f53ed..0de5289bd9b 100644
--- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
+++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
@@ -579,6 +579,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):
class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
state_checkpoint_interval = 500
+ STREAM_SLICE_STEP = 120
def __init__(self, replication_key: str, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
@@ -592,6 +593,20 @@ class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
return pendulum.parse(start_date).strftime("%Y-%m-%dT%H:%M:%SZ") # type: ignore[attr-defined,no-any-return]
return None
+ def stream_slices(
+ self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
+ ) -> Iterable[Optional[Mapping[str, Any]]]:
+ start, end = (None, None)
+ now = pendulum.now(tz="UTC")
+ initial_date = pendulum.parse((stream_state or {}).get(self.cursor_field, self.start_date), tz="UTC")
+
+ slice_number = 1
+ while not end == now:
+ start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP)
+ end = min(now, initial_date.add(days=slice_number * self.STREAM_SLICE_STEP))
+ yield {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")}
+ slice_number = slice_number + 1
+
def request_params(
self,
stream_state: Mapping[str, Any],
@@ -607,14 +622,28 @@ class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
property_chunk = property_chunk or {}
- stream_date = stream_state.get(self.cursor_field)
- start_date = stream_date or self.start_date
+ start_date = max(
+ (stream_state or {}).get(self.cursor_field, self.start_date),
+ (stream_slice or {}).get("start_date", ""),
+ (next_page_token or {}).get("start_date", ""),
+ )
+ end_date = (stream_slice or {}).get("end_date", pendulum.now(tz="UTC").isoformat(timespec="milliseconds"))
+
+ select_fields = ",".join(property_chunk.keys())
+ table_name = self.name
+ where_conditions = []
+ order_by_clause = ""
- query = f"SELECT {','.join(property_chunk.keys())} FROM {self.name} "
if start_date:
- query += f"WHERE {self.cursor_field} >= {start_date} "
+ where_conditions.append(f"{self.cursor_field} >= {start_date}")
+ if end_date:
+ where_conditions.append(f"{self.cursor_field} < {end_date}")
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
- query += f"ORDER BY {self.cursor_field} ASC"
+ order_by_clause = f"ORDER BY {self.cursor_field} ASC"
+
+ where_clause = f"WHERE {' AND '.join(where_conditions)}"
+ query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"
+
return {"q": query}
@property
@@ -635,34 +664,33 @@ class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream):
def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
- page_token: str = last_record[self.cursor_field]
- res = {"next_token": page_token}
- # use primary key as additional filtering param, if cursor_field is not increased from previous page
- if self.primary_key and self.prev_start_date == page_token:
- res["primary_key"] = last_record[self.primary_key]
- return res
+ return {"next_token": last_record[self.cursor_field], "primary_key": last_record.get(self.primary_key)}
return None
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
- selected_properties = self.get_json_schema().get("properties", {})
+ start_date = max(
+ (stream_state or {}).get(self.cursor_field, ""),
+ (stream_slice or {}).get("start_date", ""),
+ (next_page_token or {}).get("start_date", ""),
+ )
+ end_date = stream_slice["end_date"]
- stream_date = stream_state.get(self.cursor_field)
- next_token = (next_page_token or {}).get("next_token")
- primary_key = (next_page_token or {}).get("primary_key")
- start_date = next_token or stream_date or self.start_date
- self.prev_start_date = start_date
+ select_fields = ", ".join(self.get_json_schema().get("properties", {}).keys())
+ table_name = self.name
+ where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"]
+ order_by_clause = ""
- query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
- if start_date:
- if primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
- query += f"WHERE ({self.cursor_field} = {start_date} AND {self.primary_key} > '{primary_key}') OR ({self.cursor_field} > {start_date}) "
- else:
- query += f"WHERE {self.cursor_field} >= {start_date} "
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
- order_by_fields = [self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field]
- query += f"ORDER BY {','.join(order_by_fields)} ASC LIMIT {self.page_size}"
+ last_primary_key = (next_page_token or {}).get("primary_key", "")
+ if last_primary_key:
+ where_conditions.append(f"{self.primary_key} > '{last_primary_key}'")
+ order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field])
+ order_by_clause = f"ORDER BY {order_by_fields} ASC LIMIT {self.page_size}"
+
+ where_clause = f"WHERE {' AND '.join(where_conditions)}"
+ query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"
return {"q": query}
diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py
index 770c3fcc5c1..f4ed5c14b64 100644
--- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py
+++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py
@@ -31,7 +31,8 @@ def test_bulk_sync_creation_failed(stream_config, stream_api):
with requests_mock.Mocker() as m:
m.register_uri("POST", stream.path(), status_code=400, json=[{"message": "test_error"}])
with pytest.raises(HTTPError) as err:
- next(stream.read_records(sync_mode=SyncMode.full_refresh))
+ stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
+ next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))
assert err.value.response.json()[0]["message"] == "test_error"
@@ -56,7 +57,8 @@ def test_bulk_stream_fallback_to_rest(mocker, requests_mock, stream_config, stre
# mock REST API
mocker.patch("source_salesforce.source.RestSalesforceStream.read_records", lambda *args, **kwargs: iter(rest_stream_records))
assert type(stream) is BulkIncrementalSalesforceStream
- assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == rest_stream_records
+ stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
+ assert list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)) == rest_stream_records
def test_stream_unsupported_by_bulk(stream_config, stream_api, caplog):
@@ -97,7 +99,8 @@ def test_bulk_sync_pagination(item_number, stream_config, stream_api):
m.register_uri("DELETE", stream.path() + f"/{job_id}")
m.register_uri("POST", stream.path(), creation_responses)
- loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh)]
+ stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
+ loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)]
assert not set(test_ids).symmetric_difference(set(loaded_ids))
post_request_count = len([r for r in m.request_history if r.method == "POST"])
assert post_request_count == len(pages)
@@ -113,7 +116,8 @@ def _prepare_mock(m, stream):
def _get_result_id(stream):
- return int(list(stream.read_records(sync_mode=SyncMode.full_refresh))[0]["ID"])
+ stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
+ return int(list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))[0]["ID"])
def test_bulk_sync_successful(stream_config, stream_api):
@@ -165,7 +169,8 @@ def test_bulk_sync_failed_retry(stream_config, stream_api):
job_id = _prepare_mock(m, stream)
m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "InProgress", "id": job_id})
with pytest.raises(Exception) as err:
- next(stream.read_records(sync_mode=SyncMode.full_refresh))
+ stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
+ next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))
assert "stream using BULK API was failed" in str(err.value)
@@ -300,7 +305,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state):
m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"})
- resp = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-0{i},{i}" for i in range(1, 7)] # 6 records per page
+ resp = ["Field1,LastModifiedDate,Id"] + [f"test,2021-11-0{i},{i}" for i in range(1, 7)] # 6 records per page
if page == 1:
# Read the first page successfully
@@ -561,8 +566,8 @@ def test_convert_to_standard_instance(stream_config, stream_api):
def test_bulk_stream_paging(stream_config, stream_api_pk):
- last_modified_date1 = "2022-10-01T00:00:00Z"
- last_modified_date2 = "2022-10-02T00:00:00Z"
+ last_modified_date1 = "2022-10-01T00:00:00.000+00:00"
+ last_modified_date2 = "2022-10-02T00:00:00.000+00:00"
assert last_modified_date1 < last_modified_date2
stream_config["start_date"] = last_modified_date1
@@ -571,10 +576,10 @@ def test_bulk_stream_paging(stream_config, stream_api_pk):
csv_header = "Field1,LastModifiedDate,Id"
pages = [
- [f"test,{last_modified_date1},1", f"test,{last_modified_date1},3"],
- [f"test,{last_modified_date1},5", f"test,{last_modified_date2},2"],
- [f"test,{last_modified_date2},2", f"test,{last_modified_date2},4"],
- [f"test,{last_modified_date2},6"],
+ [f"test,{last_modified_date1},1", f"test,{last_modified_date1},2"],
+ [f"test,{last_modified_date1},3", f"test,{last_modified_date2},4"],
+ [f"test,{last_modified_date2},5", f"test,{last_modified_date2},6"],
+ [f"test,{last_modified_date2},7"],
]
with requests_mock.Mocker() as mocked_requests:
@@ -587,32 +592,34 @@ def test_bulk_stream_paging(stream_config, stream_api_pk):
mocked_requests.register_uri("DELETE", stream.path() + f"/{job_id}")
mocked_requests.register_uri("POST", stream.path(), post_responses)
- records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
+ stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
+ records = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))
assert records == [
{"Field1": "test", "Id": 1, "LastModifiedDate": last_modified_date1},
+ {"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 3, "LastModifiedDate": last_modified_date1},
- {"Field1": "test", "Id": 5, "LastModifiedDate": last_modified_date1},
- {"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date2},
- {"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date2}, # duplicate record
{"Field1": "test", "Id": 4, "LastModifiedDate": last_modified_date2},
+ {"Field1": "test", "Id": 5, "LastModifiedDate": last_modified_date2}, # duplicate record
{"Field1": "test", "Id": 6, "LastModifiedDate": last_modified_date2},
+ {"Field1": "test", "Id": 7, "LastModifiedDate": last_modified_date2},
]
def get_query(request_index):
return mocked_requests.request_history[request_index].json()["query"]
- SELECT = "SELECT LastModifiedDate,Id FROM Account"
- ORDER_BY = "ORDER BY LastModifiedDate,Id ASC LIMIT 2"
+ SELECT = "SELECT LastModifiedDate, Id FROM Account"
+ ORDER_BY = "ORDER BY LastModifiedDate, Id ASC LIMIT 2"
- assert get_query(0) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} {ORDER_BY}"
+ assert get_query(0) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} {ORDER_BY}"
- q = f"{SELECT} WHERE (LastModifiedDate = {last_modified_date1} AND Id > '3') OR (LastModifiedDate > {last_modified_date1}) {ORDER_BY}"
+ q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '2' {ORDER_BY}"
assert get_query(4) == q
- assert get_query(8) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date2} {ORDER_BY}"
+ q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '4' {ORDER_BY}"
+ assert get_query(8) == q
- q = f"{SELECT} WHERE (LastModifiedDate = {last_modified_date2} AND Id > '4') OR (LastModifiedDate > {last_modified_date2}) {ORDER_BY}"
+ q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '6' {ORDER_BY}"
assert get_query(12) == q
diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py
index 8f27a9efd77..72be013d2d5 100644
--- a/airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py
+++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py
@@ -81,7 +81,7 @@ def _stream_api(stream_config, describe_response_data=None):
sf_object.access_token = Mock()
sf_object.instance_url = "https://fase-account.salesforce.com"
- response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}]}
+ response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}, {"name": "Id", "type": "string"}]}
if describe_response_data:
response_data = describe_response_data
sf_object.describe = Mock(return_value=response_data)
diff --git a/connectors.md b/connectors.md
index 44651f27bcb..b105ea534c1 100644
--- a/connectors.md
+++ b/connectors.md
@@ -210,7 +210,7 @@
| **SFTP** |
| Source | airbyte/source-sftp:0.1.2 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sftp) | [connectors/source/sftp](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sftp) | [source-sftp](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | `a827c52e-791c-4135-a245-e233c5255199` |
| **SFTP Bulk** |
| Source | airbyte/source-sftp-bulk:0.1.2 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [connectors/source/sftp-bulk](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sftp-bulk) | [source-sftp-bulk](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | `31e3242f-dee7-4cdc-a4b8-8e06c5458517` |
| **SalesLoft** |
| Source | airbyte/source-salesloft:1.0.0 | beta | [docs](https://docs.airbyte.com/integrations/sources/salesloft) | [connectors/source/salesloft](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/salesloft) | [source-salesloft](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesloft) | `41991d12-d4b5-439e-afd0-260a31d4c53f` |
-| **Salesforce** |
| Source | airbyte/source-salesforce:2.0.9 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/salesforce) | [connectors/source/salesforce](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/salesforce) | [source-salesforce](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesforce) | `b117307c-14b6-41aa-9422-947e34922962` |
+| **Salesforce** |
| Source | airbyte/source-salesforce:2.0.10 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/salesforce) | [connectors/source/salesforce](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/salesforce) | [source-salesforce](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesforce) | `b117307c-14b6-41aa-9422-947e34922962` |
| **Sample Data (Faker)** |
| Source | airbyte/source-faker:2.0.3 | beta | [docs](https://docs.airbyte.com/integrations/sources/faker) | [connectors/source/faker](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/faker) | [source-faker](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-faker) | `dfd88b22-b603-4c3d-aad7-3701784586b1` |
| **SearchMetrics** |
| Source | airbyte/source-search-metrics:0.1.1 | alpha | [docs](https://docs.airbyte.com/integrations/sources/search-metrics) | [connectors/source/search-metrics](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/search-metrics) | [source-search-metrics](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-search-metrics) | `8d7ef552-2c0f-11ec-8d3d-0242ac130003` |
| **Secoda** |
| Source | airbyte/source-secoda:0.1.0 | alpha | [docs](https://docs.airbyte.com/integrations/sources/secoda) | [connectors/source/secoda](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/secoda) | [source-secoda](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-secoda) | `da9fc6b9-8059-4be0-b204-f56e22e4d52d` |
diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md
index 098501555af..5a744ab8649 100644
--- a/docs/integrations/sources/salesforce.md
+++ b/docs/integrations/sources/salesforce.md
@@ -129,6 +129,7 @@ Now that you have set up the Salesforce source connector, check out the followin
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
+| 2.0.10 | 2023-04-05 | [24888](https://github.com/airbytehq/airbyte/pull/24888) | Add more frequent checkpointing |
| 2.0.9 | 2023-03-29 | [24660](https://github.com/airbytehq/airbyte/pull/24660) | Set default start_date. Sync for last two years if start date is not present in config |
| 2.0.8 | 2023-03-30 | [24690](https://github.com/airbytehq/airbyte/pull/24690) | Handle rate limit for bulk operations |
| 2.0.7 | 2023-03-14 | [24071](https://github.com/airbytehq/airbyte/pull/24071) | Remove regex pattern for start_date, use format validation instead |