1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Salesforce make start_date optional and change format to YYYY-MM-DD (#8191)

* Make `start_date` optional

* start_date can take format `2021-07-25`

* added title to all spec input properties and changed version in Dockerfile

* added a new item into salesforce.md changelogs

* fixed case when start_date is None

* formatted code

* changed comments in unit test fixtures

* changed spec title fields

* updated source and spec yaml

Co-authored-by: Auganbay <auganenu@gmail.com>
This commit is contained in:
augan-rymkhan
2021-12-03 14:38:47 +06:00
committed by GitHub
parent 298c8b2498
commit 4e44166d0f
8 changed files with 123 additions and 23 deletions

View File

@@ -568,7 +568,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api

View File

@@ -5428,7 +5428,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.7"
- dockerImage: "airbyte/source-salesforce:0.1.8"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
connectionSpecification:
@@ -5439,35 +5439,40 @@
- "client_id"
- "client_secret"
- "refresh_token"
- "start_date"
- "api_type"
additionalProperties: false
properties:
client_id:
title: "Client ID"
description: "The Consumer Key that can be found when viewing your app in\
\ Salesforce"
type: "string"
client_secret:
title: "Client Secret"
description: "The Consumer Secret that can be found when viewing your app\
\ in Salesforce"
type: "string"
airbyte_secret: true
refresh_token:
title: "Refresh Token"
description: "Salesforce Refresh Token used for Airbyte to access your Salesforce\
\ account. If you don't know what this is, follow this <a href=\"https://medium.com/@bpmmendis94/obtain-access-refresh-tokens-from-salesforce-rest-api-a324fe4ccd9b\"\
>guide</a> to retrieve it."
type: "string"
airbyte_secret: true
start_date:
description: "UTC date and time in the format 2017-01-25T00:00:00Z. Any\
\ data before this date will not be replicated. This field uses the \"\
updated\" field if available, otherwise the \"created\" fields if they\
\ are available for a stream."
title: "Replication Start Date"
description: "Date in the format 2017-01-25. Any data before this date will\
\ not be replicated. This field uses the \"updated\" field if available,\
\ otherwise the \"created\" fields if they are available for a stream.\
\ If not set, then by default all your data is replicated."
type: "string"
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z|[0-9]{4}-[0-9]{2}-[0-9]{2}$"
examples:
- "2021-07-25"
- "2021-07-25T00:00:00Z"
is_sandbox:
title: "Sandbox"
description: "Whether or not the the app is in a Salesforce sandbox. If\
\ you do not know what this, assume it is false. We provide more info\
\ on this field in the <a href=\"https://docs.airbyte.io/integrations/destinations/salesforce#is_sandbox\"\
@@ -5475,6 +5480,7 @@
type: "boolean"
default: false
api_type:
title: "API Type"
description: "Unless you know that you are transferring a very small amount\
\ of data, prefer using the BULK API. This will help avoid using up all\
\ of your API call quota with Salesforce. Valid values are BULK or REST."

View File

@@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-salesforce

View File

@@ -44,7 +44,7 @@ class SourceSalesforce(AbstractSource):
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator))
if replication_key and stream_name not in UNSUPPORTED_FILTERING_STREAMS:
streams.append(incremental(**streams_kwargs, replication_key=replication_key, start_date=config["start_date"]))
streams.append(incremental(**streams_kwargs, replication_key=replication_key, start_date=config.get("start_date")))
else:
streams.append(full_refresh(**streams_kwargs))

View File

@@ -4,41 +4,41 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Salesforce Source Spec",
"type": "object",
"required": [
"client_id",
"client_secret",
"refresh_token",
"start_date",
"api_type"
],
"required": ["client_id", "client_secret", "refresh_token", "api_type"],
"additionalProperties": false,
"properties": {
"client_id": {
"title": "Client ID",
"description": "The Consumer Key that can be found when viewing your app in Salesforce",
"type": "string"
},
"client_secret": {
"title": "Client Secret",
"description": "The Consumer Secret that can be found when viewing your app in Salesforce",
"type": "string",
"airbyte_secret": true
},
"refresh_token": {
"title": "Refresh Token",
"description": "Salesforce Refresh Token used for Airbyte to access your Salesforce account. If you don't know what this is, follow this <a href=\"https://medium.com/@bpmmendis94/obtain-access-refresh-tokens-from-salesforce-rest-api-a324fe4ccd9b\">guide</a> to retrieve it.",
"type": "string",
"airbyte_secret": true
},
"start_date": {
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. This field uses the \"updated\" field if available, otherwise the \"created\" fields if they are available for a stream.",
"title": "Replication Start Date",
"description": "Date in the format 2017-01-25. Any data before this date will not be replicated. This field uses the \"updated\" field if available, otherwise the \"created\" fields if they are available for a stream. If not set, then by default all your data is replicated.",
"type": "string",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2021-07-25T00:00:00Z"]
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z|[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"examples": ["2021-07-25", "2021-07-25T00:00:00Z"]
},
"is_sandbox": {
"title": "Sandbox",
"description": "Whether or not the the app is in a Salesforce sandbox. If you do not know what this, assume it is false. We provide more info on this field in the <a href=\"https://docs.airbyte.io/integrations/destinations/salesforce#is_sandbox\">docs</a>.",
"type": "boolean",
"default": false
},
"api_type": {
"title": "API Type",
"description": "Unless you know that you are transferring a very small amount of data, prefer using the BULK API. This will help avoid using up all of your API call quota with Salesforce. Valid values are BULK or REST.",
"type": "string",
"enum": ["BULK", "REST"],

View File

@@ -263,10 +263,16 @@ class BulkSalesforceStream(SalesforceStream):
class IncrementalSalesforceStream(SalesforceStream, ABC):
state_checkpoint_interval = 500
def __init__(self, replication_key: str, start_date: str, **kwargs):
def __init__(self, replication_key: str, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
self.replication_key = replication_key
self.start_date = start_date
self.start_date = self.format_start_date(start_date)
@staticmethod
def format_start_date(start_date: Optional[str]) -> Optional[str]:
"""Transform the format `2021-07-25` into the format `2021-07-25T00:00:00Z`"""
if start_date:
return pendulum.parse(start_date).strftime("%Y-%m-%dT%H:%M:%SZ")
def next_page_token(self, response: requests.Response) -> str:
response_data = response.json()
@@ -289,7 +295,9 @@ class IncrementalSalesforceStream(SalesforceStream, ABC):
stream_date = stream_state.get(self.cursor_field)
start_date = next_page_token or stream_date or self.start_date
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} WHERE {self.cursor_field} >= {start_date} "
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
if start_date:
query += f"WHERE {self.cursor_field} >= {start_date} "
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.page_size}"
return {"q": query}

View File

@@ -27,6 +27,19 @@ def stream_bulk_config():
}
@pytest.fixture(scope="module")
def stream_bulk_config_without_start_date():
"""Generates streams settings for BULK logic without start_date"""
return {
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
"is_sandbox": False,
"wait_timeout": 15,
"api_type": "BULK",
}
@pytest.fixture(scope="module")
def stream_rest_config():
"""Generates streams settings for BULK logic"""
@@ -41,6 +54,33 @@ def stream_rest_config():
}
@pytest.fixture(scope="module")
def stream_rest_config_date_format():
"""Generates streams settings with `start_date` in format YYYY-MM-DD"""
return {
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
"start_date": "2010-01-18",
"is_sandbox": False,
"wait_timeout": 15,
"api_type": "REST",
}
@pytest.fixture(scope="module")
def stream_rest_config_without_start_date():
"""Generates streams settings for REST logic without start_date"""
return {
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
"is_sandbox": False,
"wait_timeout": 15,
"api_type": "REST",
}
def _stream_api(stream_config):
sf_object = Salesforce(**stream_config)
sf_object.login = Mock()
@@ -199,3 +239,48 @@ def test_bulk_sync_failed_retry(stream_bulk_config, stream_bulk_api):
with pytest.raises(Exception) as err:
next(stream.read_records(sync_mode=SyncMode.full_refresh))
assert "stream using BULK API was failed" in str(err.value)
@pytest.mark.parametrize(
"api_type,start_date_provided,stream_name,expected_start_date",
[
("BULK", True, "Account", "2010-01-18T21:18:20Z"),
("BULK", False, "Account", None),
("REST", True, "ActiveFeatureLicenseMetric", "2010-01-18T21:18:20Z"),
("REST", False, "ActiveFeatureLicenseMetric", None),
],
)
def test_stream_start_date(
api_type,
start_date_provided,
stream_name,
expected_start_date,
stream_bulk_config,
stream_bulk_api,
stream_rest_config,
stream_rest_api,
stream_rest_config_without_start_date,
stream_bulk_config_without_start_date,
):
if start_date_provided:
stream_config, stream_api = (stream_rest_config, stream_rest_api) if api_type == "REST" else (stream_bulk_config, stream_bulk_api)
stream = _generate_stream(stream_name, stream_config, stream_api)
else:
stream_config, stream_api = (
(stream_rest_config_without_start_date, stream_rest_api)
if api_type == "REST"
else (stream_bulk_config_without_start_date, stream_bulk_api)
)
stream = _generate_stream(stream_name, stream_config, stream_api)
assert stream.start_date == expected_start_date
def test_stream_start_date_should_be_converted_to_datetime_format(stream_rest_config_date_format, stream_rest_api):
stream: IncrementalSalesforceStream = _generate_stream("ActiveFeatureLicenseMetric", stream_rest_config_date_format, stream_rest_api)
assert stream.start_date == "2010-01-18T00:00:00Z"
def test_stream_start_datetime_format_should_not_changed(stream_rest_config, stream_rest_api):
stream: IncrementalSalesforceStream = _generate_stream("ActiveFeatureLicenseMetric", stream_rest_config, stream_rest_api)
assert stream.start_date == "2010-01-18T21:18:20Z"

View File

@@ -735,6 +735,7 @@ List of available streams:
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.8 | 2021-11-30 | [8191](https://github.com/airbytehq/airbyte/pull/8191) | Make `start_date` optional and change its format to `YYYY-MM-DD` |
| 0.1.7 | 2021-11-24 | [8206](https://github.com/airbytehq/airbyte/pull/8206) | Handling 400 error when trying to create a job for sync using Bulk API. |
| 0.1.6 | 2021-11-16 | [8009](https://github.com/airbytehq/airbyte/pull/8009) | Fix retring of BULK jobs |
| 0.1.5 | 2021-11-15 | [7885](https://github.com/airbytehq/airbyte/pull/7885) | Add `Transform` for output records |