|
|
|
|
@@ -36,13 +36,14 @@ Let's begin by pulling data for the last day's rates by using the `/latest` endp
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
class ExchangeRates(HttpStream):
|
|
|
|
|
url_base = "https://api.exchangeratesapi.io/"
|
|
|
|
|
url_base = "http://api.exchangeratesapi.io/"
|
|
|
|
|
|
|
|
|
|
primary_key = None
|
|
|
|
|
|
|
|
|
|
def __init__(self, base: str, **kwargs):
|
|
|
|
|
def __init__(self, config: Mapping[str, Any], **kwargs):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.base = base
|
|
|
|
|
self.base = config['base']
|
|
|
|
|
self.access_key = config['access_key']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def path(
|
|
|
|
|
@@ -60,8 +61,8 @@ class ExchangeRates(HttpStream):
|
|
|
|
|
stream_slice: Mapping[str, Any] = None,
|
|
|
|
|
next_page_token: Mapping[str, Any] = None,
|
|
|
|
|
) -> MutableMapping[str, Any]:
|
|
|
|
|
# The api requires that we include the base currency as a query param so we do that in this method
|
|
|
|
|
return {'base': self.base}
|
|
|
|
|
# The api requires that we include access_key as a query param so we do that in this method
|
|
|
|
|
return {'access_key': self.access_key}
|
|
|
|
|
|
|
|
|
|
def parse_response(
|
|
|
|
|
self,
|
|
|
|
|
@@ -80,14 +81,14 @@ class ExchangeRates(HttpStream):
|
|
|
|
|
return None
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
This may look big, but that's just because there are lots of \(unused, for now\) parameters in these methods \(those can be hidden with Python's `**kwargs`, but don't worry about it for now\). Really we just added a few lines of "significant" code: 1. Added a constructor `__init__` which stores the `base` currency to query for. 2. `return {'base': self.base}` to add the `?base=<base-value>` query parameter to the request based on the `base` input by the user. 3. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file. 4. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data.
|
|
|
|
|
This may look big, but that's just because there are lots of \(unused, for now\) parameters in these methods \(those can be hidden with Python's `**kwargs`, but don't worry about it for now\). Really we just added a few lines of "significant" code: 1. Added a constructor `__init__` which stores the `base` currency to query for and the `access_key` used for authentication. 2. `return {'access_key': self.access_key}` to add the `?access_key=<access-key-string>` query parameter to the request based on the `access_key` input by the user. 3. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file. 4. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data.
|
|
|
|
|
|
|
|
|
|
Let's also pass the `base` parameter input by the user to the stream class:
|
|
|
|
|
Let's also pass the config specified by the user to the stream class:
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
|
|
|
|
auth = NoAuth()
|
|
|
|
|
return [ExchangeRates(authenticator=auth, base=config['base'])]
|
|
|
|
|
auth = NoAuth()
|
|
|
|
|
return [ExchangeRates(authenticator=auth, config=config)]
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
We're now ready to query the API!
|
|
|
|
|
@@ -95,13 +96,13 @@ We're now ready to query the API!
|
|
|
|
|
To do this, we'll need a [ConfiguredCatalog](../../../understanding-airbyte/beginners-guide-to-catalog.md). We've prepared one [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/docs/tutorials/http_api_source_assets/configured_catalog.json) -- download this and place it in `sample_files/configured_catalog.json`. Then run:
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
|
|
|
|
|
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
you should see some output lines, one of which is a record from the API:
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
{"type": "RECORD", "record": {"stream": "exchange_rates", "data": {"base": "USD", "rates": {"GBP": 0.7196938353, "HKD": 7.7597848573, "IDR": 14482.4824162185, "ILS": 3.2412081092, "DKK": 6.1532478279, "INR": 74.7852709971, "CHF": 0.915763343, "MXN": 19.8439387671, "CZK": 21.3545717832, "SGD": 1.3261894911, "THB": 31.4398014067, "HRK": 6.2599917253, "EUR": 0.8274720728, "MYR": 4.0979726934, "NOK": 8.3043442284, "CNY": 6.4856433595, "BGN": 1.61836988, "PHP": 48.3516756309, "PLN": 3.770872983, "ZAR": 14.2690111709, "CAD": 1.2436905254, "ISK": 124.9482829954, "BRL": 5.4526272238, "RON": 4.0738932561, "NZD": 1.3841125362, "TRY": 8.3101365329, "JPY": 108.0182043856, "RUB": 74.9555647497, "KRW": 1111.7583781547, "USD": 1.0, "AUD": 1.2840711626, "HUF": 300.6206040546, "SEK": 8.3829540753}, "date": "2021-04-26"}, "emitted_at": 1619498062000}}
|
|
|
|
|
"type": "RECORD", "record": {"stream": "exchange_rates", "data": {"success": true, "timestamp": 1651129443, "base": "EUR", "date": "2022-04-28", "rates": {"AED": 3.86736, "AFN": 92.13195, "ALL": 120.627843, "AMD": 489.819318, "ANG": 1.910347, "AOA": 430.073735, "ARS": 121.119674, "AUD": 1.478877, "AWG": 1.895762, "AZN": 1.794932, "BAM": 1.953851, "BBD": 2.140212, "BDT": 91.662775, "BGN": 1.957013, "BHD": 0.396929, "BIF": 2176.669098, "BMD": 1.052909, "BND": 1.461004, "BOB": 7.298009, "BRL": 5.227798, "BSD": 1.060027, "BTC": 2.6717761e-05, "BTN": 81.165435, "BWP": 12.802036, "BYN": 3.565356, "BYR": 20637.011334, "BZD": 2.136616, "CAD": 1.349329, "CDF": 2118.452361, "CHF": 1.021627, "CLF": 0.032318, "CLP": 891.760584, "CNY": 6.953724, "COP": 4171.971894, "CRC": 701.446322, "CUC": 1.052909, "CUP": 27.902082, "CVE": 110.15345, "CZK": 24.499027, "DJF": 188.707108, "DKK": 7.441548, "DOP": 58.321493, "DZD": 152.371647, "EGP": 19.458297, "ERN": 15.793633, "ETB": 54.43729, "EUR": 1, "FJD": 2.274651, "FKP": 0.80931, "GBP": 0.839568, "GEL": 3.20611, "GGP": 0.80931, "GHS": 7.976422, "GIP": 0.80931, "GMD": 56.64554, "GNF": 9416.400803, "GTQ": 8.118402, "GYD": 221.765423, "HKD": 8.261854, "HNL": 26.0169, "HRK": 7.563467, "HTG": 115.545574, "HUF": 377.172734, "IDR": 15238.748216, "ILS": 3.489582, "IMP": 0.80931, "INR": 80.654494, "IQD": 1547.023976, "IRR": 44538.040218, "ISK": 137.457233, "JEP": 0.80931, "JMD": 163.910125, "JOD": 0.746498, "JPY": 137.331903, "KES": 121.87429, "KGS": 88.581418, "KHR": 4286.72178, "KMF": 486.443591, "KPW": 947.617993, "KRW": 1339.837191, "KWD": 0.322886, "KYD": 0.883397, "KZT": 473.770223, "LAK": 12761.755235, "LBP": 1602.661797, "LKR": 376.293562, "LRD": 159.989586, "LSL": 15.604181, "LTL": 3.108965, "LVL": 0.636894, "LYD": 5.031557, "MAD": 10.541225, "MDL": 19.593772, "MGA": 4284.002369, "MKD": 61.553251, "MMK": 1962.574442, "MNT": 3153.317641, "MOP": 8.567461, "MRO": 375.88824, "MUR": 45.165684, "MVR": 16.199478, "MWK": 865.62318, "MXN": 21.530268, "MYR": 4.594366, "MZN": 67.206888, "NAD": 15.604214, "NGN": 437.399752, "NIO": 37.965356, "NOK": 9.824365, "NPR": 129.86672, "NZD": 1.616441, "OMR": 0.405421, "PAB": 1.060027, "PEN": 4.054233, "PGK": 3.73593, "PHP": 55.075028, "PKR": 196.760944, "PLN": 4.698101, "PYG": 7246.992296, "QAR": 3.833603, "RON": 4.948144, "RSD": 117.620172, "RUB": 77.806269, "RWF": 1086.709833, "SAR": 3.949063, "SBD": 8.474149, "SCR": 14.304711, "SDG": 470.649944, "SEK": 10.367719, "SGD": 1.459695, "SHP": 1.45028, "SLL": 13082.391386, "SOS": 609.634325, "SRD": 21.904702, "STD": 21793.085136, "SVC": 9.275519, "SYP": 2645.380032, "SZL": 16.827859, "THB": 36.297991, "TJS": 13.196811, "TMT": 3.685181, "TND": 3.22348, "TOP": 2.428117, "TRY": 15.575532, "TTD": 7.202107, "TWD": 31.082183, "TZS": 2446.960099, "UAH": 32.065033, "UGX": 3773.578577, "USD": 1.052909, "UYU": 43.156886, "UZS": 11895.19696, "VEF": 225143710305.04727, "VND": 24171.62598, "VUV": 118.538204, "WST": 2.722234, "XAF": 655.287181, "XAG": 0.045404, "XAU": 0.000559, "XCD": 2.845538, "XDR": 0.783307, "XOF": 655.293398, "XPF": 118.347299, "YER": 263.490114, "ZAR": 16.77336, "ZMK": 9477.445964, "ZMW": 18.046154, "ZWL": 339.036185}}, "emitted_at": 1651130169364}}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
There we have it - a stream which reads data in just a few lines of code!
|
|
|
|
|
@@ -127,10 +128,10 @@ Let's get the easy parts out of the way and pass the `start_date`:
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
|
|
|
|
auth = NoAuth()
|
|
|
|
|
# Parse the date from a string into a datetime object
|
|
|
|
|
start_date = datetime.strptime(config['start_date'], '%Y-%m-%d')
|
|
|
|
|
return [ExchangeRates(authenticator=auth, base=config['base'], start_date=start_date)]
|
|
|
|
|
auth = NoAuth()
|
|
|
|
|
# Parse the date from a string into a datetime object
|
|
|
|
|
start_date = datetime.strptime(config['start_date'], '%Y-%m-%d')
|
|
|
|
|
return [ExchangeRates(authenticator=auth, config=config, start_date=start_date)]
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Let's also add this parameter to the constructor and declare the `cursor_field`:
|
|
|
|
|
@@ -141,18 +142,19 @@ from airbyte_cdk.sources.streams import IncrementalMixin
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ExchangeRates(HttpStream, IncrementalMixin):
|
|
|
|
|
url_base = "https://api.exchangeratesapi.io/"
|
|
|
|
|
url_base = "http://api.exchangeratesapi.io/"
|
|
|
|
|
cursor_field = "date"
|
|
|
|
|
primary_key = "date"
|
|
|
|
|
|
|
|
|
|
def __init__(self, base: str, start_date: datetime, **kwargs):
|
|
|
|
|
def __init__(self, config: Mapping[str, Any], start_date: datetime, **kwargs):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.base = base
|
|
|
|
|
self.base = config['base']
|
|
|
|
|
self.access_key = config['access_key']
|
|
|
|
|
self.start_date = start_date
|
|
|
|
|
self._cursor_value = None
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`.
|
|
|
|
|
Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config secrets/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`.
|
|
|
|
|
|
|
|
|
|
But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from. For example, `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th.
|
|
|
|
|
|
|
|
|
|
@@ -226,17 +228,17 @@ We should now have a working implementation of incremental sync!
|
|
|
|
|
Let's try it out:
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
|
|
|
|
|
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
You should see a bunch of `RECORD` messages and `STATE` messages. To verify that incremental sync is working, pass the input state back to the connector and run it again:
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
# Save the latest state to sample_files/state.json
|
|
|
|
|
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json
|
|
|
|
|
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json
|
|
|
|
|
|
|
|
|
|
# Run a read operation with the latest state message
|
|
|
|
|
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json
|
|
|
|
|
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
You should see that only the record from the last date is being synced! This is acceptable behavior, since Airbyte requires at-least-once delivery of records, so repeating the last record twice is OK.
|
|
|
|
|
|