1
0
mirror of synced 2025-12-23 21:03:15 -05:00

Source Plaid: Migrate Python CDK to No-code CDK (#29127)

Co-authored-by: Sajarin <sajarindider@gmail.com>
Co-authored-by: sh4sh <6833405+sh4sh@users.noreply.github.com>
This commit is contained in:
Mikhail Masyagin
2023-08-17 20:33:23 +03:00
committed by GitHub
parent 861abce20a
commit cc43bd7405
14 changed files with 526 additions and 277 deletions

View File

@@ -34,5 +34,5 @@ COPY source_plaid ./source_plaid
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-plaid

View File

@@ -1,24 +1,31 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-plaid:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_plaid/spec.json"
tests:
- spec_path: "source_plaid/spec.yaml"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_incremental.json"
future_state:
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

View File

@@ -1,2 +1,3 @@
#!/usr/bin/env sh
source "$(git rev-parse --show-toplevel)/airbyte-integrations/bases/connector-acceptance-test/acceptance-test-docker.sh"

View File

@@ -0,0 +1,107 @@
{
"streams": [
{
"stream": {
"name": "balance",
"supported_sync_modes": ["full_refresh"],
"json_schema": {
"required": ["account_id", "current"],
"type": "object",
"properties": {
"account_id": {
"type": "string"
},
"available": {
"type": ["number", "null"]
},
"current": {
"type": "number"
},
"iso_currency_code": {
"type": ["string", "null"]
},
"limit": {
"type": ["number", "null"]
},
"unofficial_currency_code": {
"type": ["string", "null"]
}
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "transaction",
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"json_schema": {
"type": "object",
"required": [
"account_id",
"amount",
"iso_currency_code",
"name",
"transaction_id",
"category",
"date",
"transaction_type"
],
"properties": {
"account_id": { "type": "string" },
"amount": { "type": "number" },
"category": { "type": "array", "items": { "type": "string" } },
"category_id": { "type": ["string", "null"] },
"date": { "type": "string" },
"iso_currency_code": { "type": "string" },
"name": { "type": "string" },
"payment_channel": { "type": ["string", "null"] },
"pending": { "type": ["boolean", "null"] },
"transaction_id": { "type": "string" },
"transaction_type": { "type": "string" },
"location": {
"type": ["object", "null"],
"properties": {
"address": { "type": ["string", "null"] },
"city": { "type": ["string", "null"] },
"country": { "type": ["string", "null"] },
"lat": { "type": ["string", "null"] },
"lon": { "type": ["string", "null"] },
"postal_code": { "type": ["string", "null"] },
"region": { "type": ["string", "null"] },
"store_number": { "type": ["string", "null"] }
}
},
"payment_meta": {
"type": ["object", "null"],
"properties": {
"by_order_of": { "type": ["string", "null"] },
"payee": { "type": ["string", "null"] },
"payer": { "type": ["string", "null"] },
"payment_method": { "type": ["string", "null"] },
"payment_processor": { "type": ["string", "null"] },
"ppd_id": { "type": ["string", "null"] },
"reason": { "type": ["string", "null"] },
"reference_number": { "type": ["string", "null"] }
}
},
"account_owner": { "type": ["string", "null"] },
"authorized_date": { "type": ["string", "null"] },
"authorized_datetime": { "type": ["string", "null"] },
"check_number": { "type": ["string", "null"] },
"datetime": { "type": ["string", "null"] },
"merchant_name": { "type": ["string", "null"] },
"pending_transaction_id": { "type": ["string", "null"] },
"personal_finance_category": { "type": ["string", "null"] },
"transaction_code": { "type": ["string", "null"] },
"unofficial_currency_code": { "type": ["string", "null"] }
}
}
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ed799e2b-2158-4c66-8da4-b40fe63bc72a
dockerImageTag: 0.3.2
dockerImageTag: 0.4.0
dockerRepository: airbyte/source-plaid
githubIssueLabel: source-plaid
icon: plaid.svg

View File

@@ -1,2 +1,2 @@
# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies.
-e .
-e ../../bases/connector-acceptance-test

View File

@@ -5,12 +5,15 @@
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = ["airbyte-cdk", "plaid-python"]
MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
]
TEST_REQUIREMENTS = [
"requests-mock~=1.9.3",
"pytest~=6.2",
"pytest-mock~=3.6.1",
"pytest~=6.1",
"requests-mock~=1.9.3",
"connector-acceptance-test",
]
setup(
@@ -20,7 +23,7 @@ setup(
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json"]},
package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
},

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,374 @@
version: 0.50.0
type: DeclarativeSource
check:
type: CheckStream
stream_names:
- balance
streams:
- type: DeclarativeStream
name: balance
primary_key:
- account_id
schema_loader:
type: InlineSchemaLoader
schema:
$schema: http://json-schema.org/schema#
properties:
account_id:
type: string
available:
type:
- 'null'
- number
current:
type: number
iso_currency_code:
type:
- 'null'
- string
limit:
type:
- 'null'
- number
unofficial_currency_code:
type:
- 'null'
- string
required:
- account_id
- current
type: object
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://{{config['plaid_env']}}.plaid.com
path: /accounts/balance/get
http_method: POST
request_parameters: {}
request_headers: {}
authenticator:
type: NoAuth
request_body_json:
secret: '{{config[''api_key'']}}'
options:
min_last_updated_datetime: '{{format_datetime(config[''start_date''], ''%Y-%m-%dT%H:%M:%SZ'')}}'
client_id: '{{config[''client_id'']}}'
access_token: '{{config[''access_token'']}}'
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path:
- accounts
- '*'
paginator:
type: NoPagination
transformations:
- type: AddFields
fields:
- path:
- available
value: '{{record[''balances''][''available'']}}'
- type: AddFields
fields:
- path:
- current
value: '{{record[''balances''][''current'']}}'
- type: AddFields
fields:
- path:
- iso_currency_code
value: '{{record[''balances''][''iso_currency_code'']}}'
- type: AddFields
fields:
- path:
- limit
value: '{{record[''balances''][''limit'']}}'
- type: AddFields
fields:
- path:
- unofficial_currency_code
value: '{{record[''balances''][''unofficial_currency_code'']}}'
- type: RemoveFields
field_pointers:
- - balances
- type: RemoveFields
field_pointers:
- - mask
- type: RemoveFields
field_pointers:
- - name
- type: RemoveFields
field_pointers:
- - official_name
- type: RemoveFields
field_pointers:
- - subtype
- type: RemoveFields
field_pointers:
- - type
- type: DeclarativeStream
name: transaction
primary_key:
- transaction_id
schema_loader:
type: InlineSchemaLoader
schema:
$schema: http://json-schema.org/schema#
properties:
account_id:
type: string
account_owner:
type:
- string
- 'null'
amount:
type: number
authorized_date:
type:
- string
- 'null'
authorized_datetime:
type:
- string
- 'null'
category:
items:
type: string
type: array
category_id:
type:
- string
- 'null'
check_number:
type:
- string
- 'null'
date:
type: string
datetime:
type:
- string
- 'null'
iso_currency_code:
type: string
location:
properties:
address:
type:
- string
- 'null'
city:
type:
- string
- 'null'
country:
type:
- string
- 'null'
lat:
type:
- string
- 'null'
lon:
type:
- string
- 'null'
postal_code:
type:
- string
- 'null'
region:
type:
- string
- 'null'
store_number:
type:
- string
- 'null'
type:
- object
- 'null'
merchant_name:
type:
- string
- 'null'
name:
type: string
payment_channel:
type:
- string
- 'null'
payment_meta:
properties:
by_order_of:
type:
- string
- 'null'
payee:
type:
- string
- 'null'
payer:
type:
- string
- 'null'
payment_method:
type:
- string
- 'null'
payment_processor:
type:
- string
- 'null'
ppd_id:
type:
- string
- 'null'
reason:
type:
- string
- 'null'
reference_number:
type:
- string
- 'null'
type:
- object
- 'null'
pending:
type:
- boolean
- 'null'
pending_transaction_id:
type:
- string
- 'null'
personal_finance_category:
type:
- string
- 'null'
transaction_code:
type:
- string
- 'null'
transaction_id:
type: string
transaction_type:
type: string
unofficial_currency_code:
type:
- string
- 'null'
required:
- account_id
- amount
- iso_currency_code
- name
- transaction_id
- category
- date
- transaction_type
type: object
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://{{config['plaid_env']}}.plaid.com
path: /transactions/get
http_method: POST
request_parameters: {}
request_headers: {}
authenticator:
type: NoAuth
request_body_json:
secret: '{{config[''api_key'']}}'
options:
offset: '{{ next_page_token[''next_page_token''] }}'
client_id: '{{config[''client_id'']}}'
access_token: '{{config[''access_token'']}}'
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path:
- transactions
- '*'
paginator:
type: DefaultPaginator
pagination_strategy:
type: OffsetIncrement
incremental_sync:
type: DatetimeBasedCursor
cursor_field: date
datetime_format: '%Y-%m-%d'
start_time_option:
type: RequestOption
field_name: start_date
inject_into: body_json
end_time_option:
type: RequestOption
field_name: end_date
inject_into: body_json
start_datetime:
type: MinMaxDatetime
datetime: '{{config[''start_date'']}}'
datetime_format: '%Y-%m-%d'
end_datetime:
type: MinMaxDatetime
datetime: '{{ now_utc().strftime(''%Y-%m-%dT%H:%M:%SZ'') }}'
datetime_format: '%Y-%m-%dT%H:%M:%SZ'
spec:
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
type: object
required:
- access_token
- api_key
- client_id
- plaid_env
properties:
access_token:
type: string
order: 0
title: Access Token
description: The end-user's Link access token.
airbyte_secret: true
api_key:
type: string
order: 1
title: API Key
description: The Plaid API key to use to hit the API.
airbyte_secret: true
client_id:
type: string
order: 2
title: Client ID
description: The Plaid client id.
plaid_env:
enum:
- sandbox
- development
- production
type: string
order: 3
title: Plaid Environment
description: The Plaid environment.
start_date:
type: string
order: 4
title: Start Date
format: date
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
description: >-
The date from which you'd like to replicate data for Plaid in the
format YYYY-MM-DD. All data generated after this date will be
replicated.
additionalProperties: true
documentation_url: https://example.org
type: Spec
metadata:
autoImportSchema:
balance: false
transaction: false

View File

@@ -1,13 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["account_id", "current"],
"properties": {
"account_id": { "type": "string" },
"available": { "type": ["number", "null"] },
"current": { "type": "number" },
"iso_currency_code": { "type": ["string", "null"] },
"limit": { "type": ["number", "null"] },
"unofficial_currency_code": { "type": ["string", "null"] }
}
}

View File

@@ -1,63 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": [
"account_id",
"amount",
"iso_currency_code",
"name",
"transaction_id",
"category",
"date",
"transaction_type"
],
"properties": {
"account_id": { "type": "string" },
"amount": { "type": "number" },
"category": { "type": "array", "items": { "type": "string" } },
"category_id": { "type": ["string", "null"] },
"date": { "type": "string" },
"iso_currency_code": { "type": "string" },
"name": { "type": "string" },
"payment_channel": { "type": ["string", "null"] },
"pending": { "type": ["boolean", "null"] },
"transaction_id": { "type": "string" },
"transaction_type": { "type": "string" },
"location": {
"type": ["object", "null"],
"properties": {
"address": { "type": ["string", "null"] },
"city": { "type": ["string", "null"] },
"country": { "type": ["string", "null"] },
"lat": { "type": ["string", "null"] },
"lon": { "type": ["string", "null"] },
"postal_code": { "type": ["string", "null"] },
"region": { "type": ["string", "null"] },
"store_number": { "type": ["string", "null"] }
}
},
"payment_meta": {
"type": ["object", "null"],
"properties": {
"by_order_of": { "type": ["string", "null"] },
"payee": { "type": ["string", "null"] },
"payer": { "type": ["string", "null"] },
"payment_method": { "type": ["string", "null"] },
"payment_processor": { "type": ["string", "null"] },
"ppd_id": { "type": ["string", "null"] },
"reason": { "type": ["string", "null"] },
"reference_number": { "type": ["string", "null"] }
}
},
"account_owner": { "type": ["string", "null"] },
"authorized_date": { "type": ["string", "null"] },
"authorized_datetime": { "type": ["string", "null"] },
"check_number": { "type": ["string", "null"] },
"datetime": { "type": ["string", "null"] },
"merchant_name": { "type": ["string", "null"] },
"pending_transaction_id": { "type": ["string", "null"] },
"personal_finance_category": { "type": ["string", "null"] },
"transaction_code": { "type": ["string", "null"] },
"unofficial_currency_code": { "type": ["string", "null"] }
}
}

View File

@@ -2,145 +2,17 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import datetime
import json
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
import plaid
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from plaid.api import plaid_api
from plaid.model.accounts_balance_get_request import AccountsBalanceGetRequest
from plaid.model.accounts_balance_get_request_options import AccountsBalanceGetRequestOptions
from plaid.model.transactions_get_request import TransactionsGetRequest
from plaid.model.transactions_get_request_options import TransactionsGetRequestOptions
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
SPEC_ENV_TO_PLAID_ENV = {
"production": plaid.Environment.Production,
"development": plaid.Environment.Development,
"sandbox": plaid.Environment.Sandbox,
}
WARNING: Do not modify this file.
"""
class PlaidStream(Stream):
def __init__(self, config: Mapping[str, Any]):
plaid_config = plaid.Configuration(
host=SPEC_ENV_TO_PLAID_ENV[config["plaid_env"]], api_key={"clientId": config["client_id"], "secret": config["api_key"]}
)
api_client = plaid.ApiClient(plaid_config)
self.client = plaid_api.PlaidApi(api_client)
self.access_token = config["access_token"]
self.start_date = datetime.datetime.strptime(config.get("start_date"), "%Y-%m-%d").date() if config.get("start_date") else None
class BalanceStream(PlaidStream):
@property
def name(self):
return "balance"
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return "account_id"
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
min_last_updated_datetime = datetime.datetime.strptime(
datetime.datetime.strftime(self.start_date, "%y-%m-%dT%H:%M:%SZ"),
"%y-%m-%dT%H:%M:%S%z",
)
options = AccountsBalanceGetRequestOptions(min_last_updated_datetime=min_last_updated_datetime)
getRequest = AccountsBalanceGetRequest(access_token=self.access_token, options=options)
balance_response = self.client.accounts_balance_get(getRequest)
for balance in balance_response["accounts"]:
message_dict = balance["balances"].to_dict()
message_dict["account_id"] = balance["account_id"]
yield message_dict
class IncrementalTransactionStream(PlaidStream):
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return "transaction_id"
@property
def name(self):
return "transaction"
@property
def source_defined_cursor(self) -> bool:
return True
@property
def cursor_field(self) -> Union[str, List[str]]:
return "date"
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
return {"date": latest_record.get("date")}
def _get_transactions_response(self, start_date, end_date=datetime.datetime.utcnow().date(), offset=0):
options = TransactionsGetRequestOptions()
options.offset = offset
return self.client.transactions_get(
TransactionsGetRequest(access_token=self.access_token, start_date=start_date, end_date=end_date, options=options)
)
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
date = stream_state.get("date")
all_transactions = []
if not date:
date = datetime.date.fromtimestamp(0)
else:
date = datetime.date.fromisoformat(date)
if date >= datetime.datetime.utcnow().date():
return
if self.start_date:
date = max(self.start_date, date)
response = self._get_transactions_response(date)
all_transactions.extend(response.transactions)
num_total_transactions = response.total_transactions
while len(all_transactions) < num_total_transactions:
response = self._get_transactions_response(date, offset=len(all_transactions))
all_transactions.extend(response.transactions)
yield from map(lambda x: x.to_dict(), sorted(all_transactions, key=lambda t: t["date"]))
class SourcePlaid(AbstractSource):
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
try:
plaid_config = plaid.Configuration(
host=SPEC_ENV_TO_PLAID_ENV[config["plaid_env"]], api_key={"clientId": config["client_id"], "secret": config["api_key"]}
)
api_client = plaid.ApiClient(plaid_config)
client = plaid_api.PlaidApi(api_client)
try:
request = AccountsBalanceGetRequest(access_token=config["access_token"])
client.accounts_balance_get(request)
return True, None
except plaid.ApiException as e:
response = json.loads(e.body)
return False, response
except Exception as error:
return False, error
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [BalanceStream(config), IncrementalTransactionStream(config)]
# Declarative Source
class SourcePlaid(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "manifest.yaml"})

View File

@@ -1,40 +0,0 @@
{
"documentationUrl": "https://plaid.com/docs/api/",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["access_token", "api_key", "client_id", "plaid_env"],
"additionalProperties": true,
"properties": {
"access_token": {
"type": "string",
"title": "Access Token",
"description": "The end-user's Link access token."
},
"api_key": {
"title": "API Key",
"type": "string",
"description": "The Plaid API key to use to hit the API.",
"airbyte_secret": true
},
"client_id": {
"title": "Client ID",
"type": "string",
"description": "The Plaid client id"
},
"plaid_env": {
"title": "Plaid Environment",
"type": "string",
"enum": ["sandbox", "development", "production"],
"description": "The Plaid environment"
},
"start_date": {
"title": "Start Date",
"type": "string",
"description": "The date from which you'd like to replicate data for Plaid in the format YYYY-MM-DD. All data generated after this date will be replicated.",
"examples": ["2021-03-01"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
}
}
}
}

View File

@@ -70,6 +70,7 @@ This guide will walk through how to create the credentials you need to run this
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------ |
| 0.4.0 | 2023-08-17 | [29127](https://github.com/airbytehq/airbyte/pull/29127) | Rewrote connector to no-code SDK |
| 0.3.2 | 2022-08-02 | [15231](https://github.com/airbytehq/airbyte/pull/15231) | Added min_last_updated_datetime support for Capital One items |
| 0.3.1 | 2022-03-31 | [11104](https://github.com/airbytehq/airbyte/pull/11104) | Fix 100 record limit and added start_date |
| 0.3.0 | 2022-01-05 | [7977](https://github.com/airbytehq/airbyte/pull/7977) | Migrate to Python CDK + add transaction stream |