1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source Plaid: port to Python CDK (#7977)

This commit is contained in:
firmbase-tal
2022-01-10 13:15:30 +02:00
committed by GitHub
parent 0c57100b55
commit e68c564e21
32 changed files with 678 additions and 429 deletions

View File

@@ -2,7 +2,7 @@
"sourceDefinitionId": "ed799e2b-2158-4c66-8da4-b40fe63bc72a",
"name": "Plaid",
"dockerRepository": "airbyte/source-plaid",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.3.0",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/plaid",
"icon": "plaid.svg"
}

View File

@@ -516,7 +516,7 @@
- name: Plaid
sourceDefinitionId: ed799e2b-2158-4c66-8da4-b40fe63bc72a
dockerRepository: airbyte/source-plaid
dockerImageTag: 0.2.1
dockerImageTag: 0.3.0
documentationUrl: https://docs.airbyte.io/integrations/sources/plaid
icon: plaid.svg
sourceType: api

View File

@@ -5412,7 +5412,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-plaid:0.2.1"
- dockerImage: "airbyte/source-plaid:0.3.0"
spec:
documentationUrl: "https://plaid.com/docs/api/"
connectionSpecification:
@@ -5422,6 +5422,7 @@
- "access_token"
- "api_key"
- "client_id"
- "plaid_env"
additionalProperties: false
properties:
access_token:

View File

@@ -1,8 +1,7 @@
*
!Dockerfile
!Dockerfile.test
!package.json
!spec.json
!source.js
!main.py
!source_plaid
!setup.py
!secrets
!fullrefresh_configured_catalog.json

View File

@@ -1 +0,0 @@
node_modules

View File

@@ -1,4 +0,0 @@
{
"printWidth": 120,
"singleQuote": true
}

View File

@@ -1,16 +1,38 @@
# node 14
FROM node:alpine3.12
FROM python:3.7.11-alpine3.14 as base
# build and load all requirements
FROM base as builder
WORKDIR /airbyte/integration_code
# Copy source files
COPY package.json .
COPY source.js .
COPY spec.json .
# Install any needed dependencies
RUN npm install
ENV AIRBYTE_ENTRYPOINT "node /airbyte/integration_code/source.js"
ENTRYPOINT ["node", "/airbyte/integration_code/source.js"]
# upgrade pip to the latest version
RUN apk --no-cache upgrade \
&& pip install --upgrade pip \
&& apk --no-cache add tzdata build-base
LABEL io.airbyte.version=0.2.1
COPY setup.py ./
# install necessary packages to a temporary folder
RUN pip install --prefix=/install .
# build a clean environment
FROM base
WORKDIR /airbyte/integration_code
# copy all loaded and built libraries to a pure basic image
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone
# bash is installed for more convenient debugging.
RUN apk --no-cache add bash
# copy payload code only
COPY main.py ./
COPY source_plaid ./source_plaid
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-plaid

View File

@@ -1,75 +1,129 @@
# Plaid Source
This is the repository for the JavaScript Template source connector, written in JavaScript.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/javascript-template).
This is the repository for the Plaid source connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/plaid-new).
## Local development
### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**
#### Build & Activate Virtual Environment
First, build the module by running the following from the `airbyte` project root directory:
#### Minimum Python version required `= 3.7.0`
#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
./gradlew :airbyte-integrations:connectors:source-plaid:build
python -m venv .venv
```
This will generate a virtualenv for this module in `source-plaid/.venv`. Make sure this venv is active in your
development environment of choice. To activate the venv from the terminal, run:
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
cd airbyte-integrations/connectors/source-plaid # cd into the connector directory
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.
#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-plaid-new:build
```
#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/plaid)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_plaid/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/javascript-template)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_javascript_template/spec.json` file.
See `sample_files/sample_config.json` for a sample config file.
**If you are an Airbyte core member**, copy the credentials in RPass under the secret name `source-plaid-integration-test-config`
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source plaid-new test creds`
and place them into `secrets/config.json`.
### Locally running the connector
```
npm install
node source.js spec
node source.js check --config secrets/config.json
node source.js discover --config secrets/config.json
node source.js read --config secrets/config.json --catalog sample_files/configured_catalog.json
```
### Unit Tests (wip)
To run unit tests locally, from the connector directory run:
```
npm test
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
### Locally running the connector docker image
#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/source-plaid:dev
```
You can also build the connector image via Gradle:
```
# in airbyte root directory
./gradlew :airbyte-integrations:connectors:source-plaid:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-plaid:dev spec
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-plaid/secrets:/secrets airbyte/source-plaid:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-plaid/secrets:/secrets airbyte/source-plaid:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-plaid/secrets:/secrets -v $(pwd)/airbyte-integrations/connectors/source-plaid/sample_files:/sample_files airbyte/source-plaid:dev read --config /secrets/config.json --catalog /sample_files/fullrefresh_configured_catalog.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-plaid:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-plaid:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-plaid:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install '.[tests]'
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```
### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
```
To run your integration tests with docker
1. From the airbyte project root, run `./gradlew :airbyte-integrations:connectors:source-plaid:integrationTest` to run the standard integration test suite.
1. To run additional integration tests, place your integration tests in a new directory `integration_tests` and run them with `node test (wip)`.
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:source-plaid-new:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-plaid-new:integrationTest
```
## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list
All of your dependencies should go in `package.json`.
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

View File

@@ -0,0 +1,24 @@
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-plaid:dev
tests:
spec:
- spec_path: "source_plaid/spec.json"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env sh
# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
# Run
docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
-v $(pwd):/test_input \
airbyte/source-acceptance-test \
--acceptance-test-config /test_input

View File

@@ -1,42 +1,14 @@
plugins {
id 'airbyte-python'
id 'airbyte-docker'
id 'airbyte-standard-source-test-file'
id 'base' // ?
id 'com.github.node-gradle.node' version '2.2.4'
id 'airbyte-source-acceptance-test'
}
node {
download = true
version = "14.11.0"
}
npm_run_build {
inputs.files fileTree('public')
inputs.files fileTree('src')
inputs.file 'package.json'
inputs.file 'package-lock.json'
outputs.dir project.buildDir
}
assemble.dependsOn npm_run_build
//task test(type: NpmTask) {
// dependsOn assemble
//
// args = ['run', 'test', '--', '--watchAll=false']
// inputs.files fileTree('src')
// inputs.file 'package.json'
// inputs.file 'package-lock.json'
//}
airbyteStandardSourceTestFile {
// All these input paths must live inside this connector's directory (or subdirectories)
configPath = "secrets/config.json"
configuredCatalogPath = "sample_files/fullrefresh_configured_catalog.json"
specPath = "spec.json"
airbytePython {
moduleDirectory 'source_plaid_singer'
}
dependencies {
implementation files(project(':airbyte-integrations:bases:base-standard-source-test-file').airbyteDocker.outputs)
implementation files(project(':airbyte-integrations:bases:base').airbyteDocker.outputs)
implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs)
implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs)
}

View File

@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,5 @@
{
"transaction": {
"date": "2120-01-01"
}
}

View File

@@ -0,0 +1,14 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import pytest
pytest_plugins = ("source_acceptance_test.plugin",)
@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
yield

View File

@@ -0,0 +1,104 @@
{
"streams": [
{
"stream": {
"name": "balance",
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": true,
"json_schema": {
"type": "object",
"required": ["account_id", "current"],
"properties": {
"account_id": {
"type": "string"
},
"available": {
"type": ["number", "null"]
},
"current": {
"type": "number"
},
"iso_currency_code": {
"type": ["string", "null"]
},
"limit": {
"type": ["number", "null"]
},
"unofficial_currency_code": {
"type": ["string", "null"]
}
}
}
}
},
{
"stream": {
"name": "transaction",
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"json_schema": {
"type": "object",
"required": [
"account_id",
"amount",
"iso_currency_code",
"name",
"transaction_id",
"category",
"date",
"transaction_type"
],
"properties": {
"account_id": { "type": "string" },
"amount": { "type": "number" },
"category": { "type": "array", "items": { "type": "string" } },
"category_id": { "type": ["string", "null"] },
"date": { "type": "string" },
"iso_currency_code": { "type": "string" },
"name": { "type": "string" },
"payment_channel": { "type": ["string", "null"] },
"pending": { "type": ["boolean", "null"] },
"transaction_id": { "type": "string" },
"transaction_type": { "type": "string" },
"location": {
"type": ["object", "null"],
"properties": {
"address": { "type": ["string", "null"] },
"city": { "type": ["string", "null"] },
"country": { "type": ["string", "null"] },
"lat": { "type": ["string", "null"] },
"lon": { "type": ["string", "null"] },
"postal_code": { "type": ["string", "null"] },
"region": { "type": ["string", "null"] },
"store_number": { "type": ["string", "null"] }
}
},
"payment_meta": {
"type": ["object", "null"],
"properties": {
"by_order_of": { "type": ["string", "null"] },
"payee": { "type": ["string", "null"] },
"payer": { "type": ["string", "null"] },
"payment_method": { "type": ["string", "null"] },
"payment_processor": { "type": ["string", "null"] },
"ppd_id": { "type": ["string", "null"] },
"reason": { "type": ["string", "null"] },
"reference_number": { "type": ["string", "null"] }
}
},
"account_owner": { "type": ["string", "null"] },
"authorized_date": { "type": ["string", "null"] },
"authorized_datetime": { "type": ["string", "null"] },
"check_number": { "type": ["string", "null"] },
"datetime": { "type": ["string", "null"] },
"merchant_name": { "type": ["string", "null"] },
"pending_transaction_id": { "type": ["string", "null"] },
"personal_finance_category": { "type": ["string", "null"] },
"transaction_code": { "type": ["string", "null"] },
"unofficial_currency_code": { "type": ["string", "null"] }
}
}
}
}
]
}

View File

@@ -0,0 +1,107 @@
{
"streams": [
{
"stream": {
"name": "balance",
"supported_sync_modes": ["full_refresh"],
"json_schema": {
"required": ["account_id", "current"],
"type": "object",
"properties": {
"account_id": {
"type": "string"
},
"available": {
"type": ["number", "null"]
},
"current": {
"type": "number"
},
"iso_currency_code": {
"type": ["string", "null"]
},
"limit": {
"type": ["number", "null"]
},
"unofficial_currency_code": {
"type": ["string", "null"]
}
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "transaction",
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"json_schema": {
"type": "object",
"required": [
"account_id",
"amount",
"iso_currency_code",
"name",
"transaction_id",
"category",
"date",
"transaction_type"
],
"properties": {
"account_id": { "type": "string" },
"amount": { "type": "number" },
"category": { "type": "array", "items": { "type": "string" } },
"category_id": { "type": ["string", "null"] },
"date": { "type": "string" },
"iso_currency_code": { "type": "string" },
"name": { "type": "string" },
"payment_channel": { "type": ["string", "null"] },
"pending": { "type": ["boolean", "null"] },
"transaction_id": { "type": "string" },
"transaction_type": { "type": "string" },
"location": {
"type": ["object", "null"],
"properties": {
"address": { "type": ["string", "null"] },
"city": { "type": ["string", "null"] },
"country": { "type": ["string", "null"] },
"lat": { "type": ["string", "null"] },
"lon": { "type": ["string", "null"] },
"postal_code": { "type": ["string", "null"] },
"region": { "type": ["string", "null"] },
"store_number": { "type": ["string", "null"] }
}
},
"payment_meta": {
"type": ["object", "null"],
"properties": {
"by_order_of": { "type": ["string", "null"] },
"payee": { "type": ["string", "null"] },
"payer": { "type": ["string", "null"] },
"payment_method": { "type": ["string", "null"] },
"payment_processor": { "type": ["string", "null"] },
"ppd_id": { "type": ["string", "null"] },
"reason": { "type": ["string", "null"] },
"reference_number": { "type": ["string", "null"] }
}
},
"account_owner": { "type": ["string", "null"] },
"authorized_date": { "type": ["string", "null"] },
"authorized_datetime": { "type": ["string", "null"] },
"check_number": { "type": ["string", "null"] },
"datetime": { "type": ["string", "null"] },
"merchant_name": { "type": ["string", "null"] },
"pending_transaction_id": { "type": ["string", "null"] },
"personal_finance_category": { "type": ["string", "null"] },
"transaction_code": { "type": ["string", "null"] },
"unofficial_currency_code": { "type": ["string", "null"] }
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -0,0 +1,6 @@
{
"api_key": "??",
"client_id": "??",
"plaid_env": "sandbox",
"access_token": "??"
}

View File

@@ -0,0 +1,6 @@
{
"access_token": "??",
"api_key": "??",
"client_id": "??",
"plaid_env": "sandbox"
}

View File

@@ -0,0 +1,5 @@
{
"transaction": {
"date": "2020-01-01"
}
}

View File

@@ -0,0 +1,13 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import sys
from airbyte_cdk.entrypoint import launch
from source_plaid import SourcePlaid
if __name__ == "__main__":
source = SourcePlaid()
launch(source, sys.argv[1:])

View File

@@ -1,31 +0,0 @@
{
"name": "source-plaid",
"version": "1.0.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"argparse": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz",
"integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q=="
},
"axios": {
"version": "0.21.1",
"resolved": "https://registry.npmjs.org/axios/-/axios-0.21.1.tgz",
"integrity": "sha512-dKQiRHxGD9PPRIUNIWvZhPTPpl1rf/OxTYKsqKUDjBwYylTvV7SjSHJb9ratfyzM6wCdLCOYLzs73qpg5c4iGA==",
"requires": {
"follow-redirects": "^1.10.0"
}
},
"date-fns": {
"version": "2.16.1",
"resolved": "https://registry.npmjs.org/date-fns/-/date-fns-2.16.1.tgz",
"integrity": "sha512-sAJVKx/FqrLYHAQeN7VpJrPhagZc9R4ImZIWYRFZaaohR3KzmuK88touwsSwSVT8Qcbd4zoDsnGfX4GFB4imyQ=="
},
"follow-redirects": {
"version": "1.13.1",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.13.1.tgz",
"integrity": "sha512-SSG5xmZh1mkPGyKzjZP8zLjltIfpW32Y5QpdNJyjcfGxK3qo3NDDkZOZSFiGn1A6SclQxY9GzEwAHQ3dmYRWpg=="
}
}
}

View File

@@ -1,16 +0,0 @@
{
"name": "source-plaid",
"version": "1.0.0",
"description": "Airbyte Plaid Source.",
"main": "source.js",
"scripts": {
"build": ""
},
"author": "",
"license": "ISC",
"dependencies": {
"argparse": "^2.0.1",
"axios": "^0.21.1",
"date-fns": "^2.16.1"
}
}

View File

@@ -0,0 +1,3 @@
# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies.
-e ../../bases/source-acceptance-test
-e .

View File

@@ -1,34 +0,0 @@
{
"streams": [
{
"stream": {
"name": "balances",
"supported_sync_modes": ["full_refresh"],
"json_schema": {
"properties": {
"account_id": {
"type": "string"
},
"available": {
"type": "number"
},
"current": {
"type": "number"
},
"iso_currency_code": {
"type": "string"
},
"limit": {
"type": "number"
},
"unofficial_currency_code": {
"type": "string"
}
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -0,0 +1,26 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = ["airbyte-cdk", "plaid-python"]
TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
]
setup(
name="source_plaid",
description="Source implementation for Plaid.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
},
)

View File

@@ -1,252 +0,0 @@
const fs = require('fs');
const axios = require('axios');
const path = require('path');
const { ArgumentParser } = require('argparse');
const dateFns = require('date-fns');
const getMilliseconds = dateFns.getMilliseconds;
async function read(config, catalog) {
let balancesStream = null;
for (const configuredStreamIndex in catalog.streams) {
const configuredStream = catalog.streams[configuredStreamIndex];
if (configuredStream.stream.name === 'balances') {
balancesStream = configuredStream;
}
}
if (balancesStream === null) {
log('No streams selected');
return;
}
// We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if (balancesStream.sync_mode !== 'full_refresh') {
log('This connector only supports full refresh syncs! (for now)');
process.exit(1);
}
// If we've made it this far, all the configuration is good and we can pull the balance.
const now = new Date();
const url = `${getBaseUrl(config.plaid_env)}/accounts/balance/get`;
const response = await axios.post(
url,
{
access_token: config.access_token,
client_id: config.client_id,
secret: config.api_key,
},
{ validateStatus: () => true }
);
if (response.status !== 200) {
log('Failure occurred when calling Plaid API');
process.exit(1);
} else {
response.data.accounts
.map((account) => {
const data = {
account_id: account.account_id,
available: account.balances.available,
current: account.balances.current,
iso_currency_code: account.balances.iso_currency_code,
limit: account.balances.limit,
unofficial_currency_code: account.balances.unofficial_currency_code,
};
const record = {
stream: 'balances',
data: data,
emitted_at: getMilliseconds(now),
};
return { type: 'RECORD', record: record };
})
.forEach((record) => console.log(JSON.stringify(record)));
}
}
function readJson(filePath) {
return JSON.parse(fs.readFileSync(filePath));
}
function getBaseUrl(plaidEnv) {
if (plaidEnv === 'sandbox') {
return 'https://sandbox.plaid.com';
} else if (plaidEnv === 'development') {
return 'https://development.plaid.com';
} else if (plaidEnv === 'production') {
return 'https://production.plaid.com';
} else {
throw new Error('Invalid Plaid Environment');
}
}
async function check(config) {
// Validate input configuration by hitting the balance endpoint.
let result;
const url = `${getBaseUrl(config.plaid_env)}/accounts/balance/get`;
const response = await axios.post(
url,
{
access_token: config.access_token,
client_id: config.client_id,
secret: config.api_key,
},
{ validateStatus: () => true }
);
if (response.status === 200) {
result = { status: 'SUCCEEDED' };
} else if (response.data.code === 'INVALID_ACCESS_TOKEN') {
result = { status: 'FAILED', message: 'Access token is incorrect.' };
} else {
result = {
status: 'FAILED',
message: response.data.error_message,
};
}
// Format the result of the check operation according to the Airbyte Specification
const outputMessage = { type: 'CONNECTION_STATUS', connectionStatus: result };
console.log(JSON.stringify(outputMessage));
}
function log(message) {
const logJson = { type: 'LOG', log: message };
console.log(logJson);
}
function discover() {
const catalog = {
streams: [
{
name: 'balance',
supported_sync_modes: ['full_refresh'],
json_schema: {
properties: {
account_id: {
type: 'string',
},
available: {
type: 'number',
},
current: {
type: 'number',
},
iso_currency_code: {
type: 'string',
},
limit: {
type: 'number',
},
unofficial_currency_code: {
type: 'string',
},
},
},
},
],
};
const airbyte_message = { type: 'CATALOG', catalog };
console.log(JSON.stringify(airbyte_message));
}
function getInputFilePath(filePath) {
if (path.isAbsolute(filePath)) {
return filePath;
} else {
return path.join(process.cwd(), filePath);
}
}
function spec() {
// Read the file named spec.json from the module directory as a JSON file
const specPath = path.join(path.dirname(__filename), 'spec.json');
const specification = readJson(specPath);
// form an Airbyte Message containing the spec and print it to stdout
const airbyteMessage = { type: 'SPEC', spec: specification };
console.log(JSON.stringify(airbyteMessage));
}
async function run(args) {
const parentParser = new ArgumentParser({ add_help: false });
const mainParser = new ArgumentParser({ add_help: false });
const subparsers = mainParser.add_subparsers({ title: 'commands', dest: 'command' });
// Accept the spec command
subparsers.add_parser('spec', {
help: 'outputs the json configuration specification',
parents: [parentParser],
});
// Accept the check command
const checkParser = subparsers.add_parser('check', {
help: 'checks the config used to connect',
parents: [parentParser],
});
const requiredCheckParser = checkParser.add_argument_group('required named arguments');
requiredCheckParser.add_argument('--config', {
type: 'str',
required: true,
help: 'path to the json configuration file',
});
// Accept the discover command
const discover_parser = subparsers.add_parser('discover', {
help: "outputs a catalog describing the source's schema",
parents: [parentParser],
});
const requiredDiscoverParser = discover_parser.add_argument_group('required named arguments');
requiredDiscoverParser.add_argument('--config', {
type: 'str',
required: true,
help: 'path to the json configuration file',
});
// Accept the read command
const readParser = subparsers.add_parser('read', {
help: 'reads the source and outputs messages to STDOUT',
parents: [parentParser],
});
readParser.add_argument('--state', {
type: 'str',
required: false,
help: 'path to the json-encoded state file',
});
const requiredReadParser = readParser.add_argument_group('required named arguments');
requiredReadParser.add_argument('--config', {
type: 'str',
required: true,
help: 'path to the json configuration file',
});
requiredReadParser.add_argument('--catalog', {
type: 'str',
required: true,
help: 'path to the catalog used to determine which data to read',
});
const parsedArgs = mainParser.parse_args(args);
const command = parsedArgs.command;
if (command === 'spec') {
spec();
} else if (command === 'check') {
const config = readJson(getInputFilePath(parsedArgs.config));
await check(config);
} else if (command === 'discover') {
discover();
} else if (command === 'read') {
const config = readJson(getInputFilePath(parsedArgs.config));
const configuredCatalog = readJson(getInputFilePath(parsedArgs.catalog));
await read(config, configuredCatalog);
} else {
// If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
// had a failure
log('Invalid command. Allowable commands: [spec, check, discover, read]');
process.exit(1);
}
// A zero exit code means the process successfully completed
process.exit(0);
}
(async function () {
await run(process.argv.slice(2)).catch((reason) => console.log(reason));
})();

View File

@@ -0,0 +1,8 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from .source import SourcePlaid
__all__ = ["SourcePlaid"]

View File

@@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["account_id", "current"],
"properties": {
"account_id": { "type": "string" },
"available": { "type": ["number", "null"] },
"current": { "type": "number" },
"iso_currency_code": { "type": ["string", "null"] },
"limit": { "type": ["number", "null"] },
"unofficial_currency_code": { "type": ["string", "null"] }
}
}

View File

@@ -0,0 +1,63 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": [
"account_id",
"amount",
"iso_currency_code",
"name",
"transaction_id",
"category",
"date",
"transaction_type"
],
"properties": {
"account_id": { "type": "string" },
"amount": { "type": "number" },
"category": { "type": "array", "items": { "type": "string" } },
"category_id": { "type": ["string", "null"] },
"date": { "type": "string" },
"iso_currency_code": { "type": "string" },
"name": { "type": "string" },
"payment_channel": { "type": ["string", "null"] },
"pending": { "type": ["boolean", "null"] },
"transaction_id": { "type": "string" },
"transaction_type": { "type": "string" },
"location": {
"type": ["object", "null"],
"properties": {
"address": { "type": ["string", "null"] },
"city": { "type": ["string", "null"] },
"country": { "type": ["string", "null"] },
"lat": { "type": ["string", "null"] },
"lon": { "type": ["string", "null"] },
"postal_code": { "type": ["string", "null"] },
"region": { "type": ["string", "null"] },
"store_number": { "type": ["string", "null"] }
}
},
"payment_meta": {
"type": ["object", "null"],
"properties": {
"by_order_of": { "type": ["string", "null"] },
"payee": { "type": ["string", "null"] },
"payer": { "type": ["string", "null"] },
"payment_method": { "type": ["string", "null"] },
"payment_processor": { "type": ["string", "null"] },
"ppd_id": { "type": ["string", "null"] },
"reason": { "type": ["string", "null"] },
"reference_number": { "type": ["string", "null"] }
}
},
"account_owner": { "type": ["string", "null"] },
"authorized_date": { "type": ["string", "null"] },
"authorized_datetime": { "type": ["string", "null"] },
"check_number": { "type": ["string", "null"] },
"datetime": { "type": ["string", "null"] },
"merchant_name": { "type": ["string", "null"] },
"pending_transaction_id": { "type": ["string", "null"] },
"personal_finance_category": { "type": ["string", "null"] },
"transaction_code": { "type": ["string", "null"] },
"unofficial_currency_code": { "type": ["string", "null"] }
}
}

View File

@@ -0,0 +1,120 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import datetime
import json
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
import plaid
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from plaid.api import plaid_api
from plaid.model.accounts_balance_get_request import AccountsBalanceGetRequest
from plaid.model.transactions_get_request import TransactionsGetRequest
SPEC_ENV_TO_PLAID_ENV = {
"production": plaid.Environment.Production,
"development": plaid.Environment.Development,
"sandbox": plaid.Environment.Sandbox,
}
class PlaidStream(Stream):
def __init__(self, config: Mapping[str, Any]):
plaid_config = plaid.Configuration(
host=SPEC_ENV_TO_PLAID_ENV[config["plaid_env"]], api_key={"clientId": config["client_id"], "secret": config["api_key"]}
)
api_client = plaid.ApiClient(plaid_config)
self.client = plaid_api.PlaidApi(api_client)
self.access_token = config["access_token"]
class BalanceStream(PlaidStream):
@property
def name(self):
return "balance"
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return "account_id"
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
balance_response = self.client.accounts_balance_get(AccountsBalanceGetRequest(access_token=self.access_token))
for balance in balance_response["accounts"]:
message_dict = balance["balances"].to_dict()
message_dict["account_id"] = balance["account_id"]
yield message_dict
class IncrementalTransactionStream(PlaidStream):
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return "transaction_id"
@property
def name(self):
return "transaction"
@property
def source_defined_cursor(self) -> bool:
return True
@property
def cursor_field(self) -> Union[str, List[str]]:
return "date"
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
return {"date": latest_record.get("date")}
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
date = stream_state.get("date")
if not date:
date = datetime.date.fromtimestamp(0)
else:
date = datetime.date.fromisoformat(date)
if date >= datetime.datetime.utcnow().date():
return
transaction_response = self.client.transactions_get(
TransactionsGetRequest(access_token=self.access_token, start_date=date, end_date=datetime.datetime.utcnow().date())
)
yield from map(lambda x: x.to_dict(), sorted(transaction_response["transactions"], key=lambda t: t["date"]))
class SourcePlaid(AbstractSource):
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
try:
plaid_config = plaid.Configuration(
host=SPEC_ENV_TO_PLAID_ENV[config["plaid_env"]], api_key={"clientId": config["client_id"], "secret": config["api_key"]}
)
api_client = plaid.ApiClient(plaid_config)
client = plaid_api.PlaidApi(api_client)
try:
request = AccountsBalanceGetRequest(access_token=config["access_token"])
client.accounts_balance_get(request)
return True, None
except plaid.ApiException as e:
response = json.loads(e.body)
return False, response
except Exception as error:
return False, error
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [BalanceStream(config), IncrementalTransactionStream(config)]

View File

@@ -3,7 +3,7 @@
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["access_token", "api_key", "client_id"],
"required": ["access_token", "api_key", "client_id", "plaid_env"],
"additionalProperties": false,
"properties": {
"access_token": {

View File

@@ -64,3 +64,6 @@ This guide will walk through how to create the credentials you need to run this
```
* We should now have everything we need to configure this source in the UI.
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.3.0 | 2022-01-05 | [7977](https://github.com/airbytehq/airbyte/pull/7977) | Migrate to Python CDK + add transaction stream |