Update incremental sync tutorial to match recent changes in build-a-connector tutorial (#11126)
This commit is contained in:
@@ -50,11 +50,11 @@ In this case we might choose something like this:
|
||||
}
|
||||
```
|
||||
|
||||
The second change we need to make to the `read` method is to use the state object so that we only emit new records. This stock ticker API does not give us control over how we query it, so we will have to filter out records that we already replicated within the Source.
|
||||
The second change we need to make to the `read` method is to use the state object so that we only emit new records.
|
||||
|
||||
Lastly, we need to emit an updated state object, so that the next time this Source runs we do not resend messages that we have already sent.
|
||||
|
||||
Here's what our updated source would look like.
|
||||
Here's what our updated `read` method would look like.
|
||||
|
||||
```python
|
||||
def read(config, catalog, state):
|
||||
@@ -73,48 +73,101 @@ def read(config, catalog, state):
|
||||
log("No streams selected")
|
||||
return
|
||||
|
||||
# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
|
||||
api_key = config["api_key"]
|
||||
stock_ticker = config["stock_ticker"]
|
||||
response = _call_api(f"/stock/{stock_ticker}/chart/7d", api_key)
|
||||
# max_date starts at the value from the incoming state object. None if there was no previous state.
|
||||
max_date = state.get("stock_prices")
|
||||
if response.status_code != 200:
|
||||
# In a real scenario we'd handle this error better :)
|
||||
log("Failure occurred when calling IEX API")
|
||||
sys.exit(1)
|
||||
else:
|
||||
# Sort the stock prices ascending by date then output them one by one as AirbyteMessages
|
||||
prices = sorted(response.json(), key=lambda record: to_datetime(record["date"]))
|
||||
for price in prices:
|
||||
data = {"date": price["date"], "stock_ticker": price["symbol"], "price": price["close"]}
|
||||
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.datetime.now().timestamp()) * 1000}
|
||||
output_message = {"type": "RECORD", "record": record}
|
||||
# By default we fetch stock prices for the 7 day period ending with today
|
||||
today = date.today()
|
||||
cursor_value = today.strftime("%Y-%m-%d")
|
||||
from_day = (today - timedelta(days=7)).strftime("%Y-%m-%d")
|
||||
|
||||
if stock_prices_stream["sync_mode"] == "incremental":
|
||||
# Filter records that are older than the last state.
|
||||
# If no previous state, filter nothing.
|
||||
state_date = to_datetime(state.get("stock_prices"))
|
||||
if state_date and state_date > to_datetime(data["date"]):
|
||||
continue
|
||||
# If this record has the greatest date value so far, bump
|
||||
# max_date.
|
||||
if not max_date or to_datetime(max_date) < to_datetime(data["date"]):
|
||||
max_date = data["date"]
|
||||
# In case of incremental sync, state should contain the last date when we fetched stock prices
|
||||
if stock_prices_stream["sync_mode"] == "incremental":
|
||||
if state and state.get("stock_prices"):
|
||||
from_date = datetime.strptime(state.get("stock_prices"), "%Y-%m-%d")
|
||||
from_day = (from_date + timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
|
||||
print(json.dumps(output_message))
|
||||
# If the state indicates that we have already ran the sync up to cursor_value, we can skip the sync
|
||||
if cursor_value != from_day:
|
||||
# If we've made it this far, all the configuration is good and we can pull the market data
|
||||
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"], from_day=from_day, to_day=cursor_value)
|
||||
if response.status_code != 200:
|
||||
# In a real scenario we'd handle this error better :)
|
||||
log("Failure occurred when calling Polygon.io API")
|
||||
sys.exit(1)
|
||||
else:
|
||||
# Stock prices are returned sorted by by date in ascending order
|
||||
# We want to output them one by one as AirbyteMessages
|
||||
response_json = response.json()
|
||||
if response_json["resultsCount"] > 0:
|
||||
results = response_json["results"]
|
||||
for result in results:
|
||||
data = {"date": datetime.fromtimestamp(result["t"]/1000, tz=timezone.utc).strftime("%Y-%m-%d"), "stock_ticker": config["stock_ticker"], "price": result["c"]}
|
||||
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.now().timestamp()) * 1000}
|
||||
output_message = {"type": "RECORD", "record": record}
|
||||
print(json.dumps(output_message))
|
||||
|
||||
# Emit new state message.
|
||||
if stock_prices_stream["sync_mode"] == "incremental":
|
||||
output_message = {"type": "STATE", "state": {"data": {"stock_prices": max_date}}}
|
||||
print(json.dumps(output_message))
|
||||
# We update the cursor as we print out the data, so that next time sync starts where we stopped printing out results
|
||||
if stock_prices_stream["sync_mode"] == "incremental":
|
||||
cursor_value = datetime.fromtimestamp(results[len(results)-1]["t"]/1000, tz=timezone.utc).strftime("%Y-%m-%d")
|
||||
|
||||
def to_datetime(date):
|
||||
if date:
|
||||
return datetime.datetime.strptime(date, '%Y-%m-%d')
|
||||
else:
|
||||
return None
|
||||
# Emit new state message.
|
||||
if stock_prices_stream["sync_mode"] == "incremental":
|
||||
output_message = {"type": "STATE", "state": {"data": {"stock_prices": cursor_value}}}
|
||||
print(json.dumps(output_message))
|
||||
```
|
||||
|
||||
That's all you need to do to add incremental functionality to the stock ticker Source. Incremental definitely requires more configurability than full refresh, so your implementation may deviate slightly depending on whether your cursor field is source defined or user-defined. If you think you are running into one of those cases, check out our [incremental](../../understanding-airbyte/connections/incremental-append.md) documentation for more information on different types of configuration.
|
||||
We will also need to parse `state` argument in the `run` method. In order to do that, we will modify the code that
|
||||
calls `read` method from `run` method:
|
||||
```python
|
||||
elif command == "read":
|
||||
config = read_json(get_input_file_path(parsed_args.config))
|
||||
configured_catalog = read_json(get_input_file_path(parsed_args.catalog))
|
||||
state = None
|
||||
if parsed_args.state:
|
||||
state = read_json(get_input_file_path(parsed_args.state))
|
||||
|
||||
read(config, configured_catalog, state)
|
||||
```
|
||||
Finally, we need to pass more arguments to our `_call_api` method in order to fetch only new prices for incremental sync:
|
||||
```python
|
||||
def _call_api(ticker, token, from_day, to_day):
|
||||
return requests.get(f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/day/{from_day}/{to_day}?sort=asc&limit=120&apiKey={token}")
|
||||
```
|
||||
|
||||
You will notice that in order to test these changes you need a `state` object. If you run an incremental sync
|
||||
without passing a state object, the new code will output a state object that you can use with the next sync. If you run this:
|
||||
```bash
|
||||
python source.py read --config secrets/valid_config.json --catalog incremental_configured_catalog.json
|
||||
```
|
||||
|
||||
The output will look like following:
|
||||
```bash
|
||||
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-07", "stock_ticker": "TSLA", "price": 804.58}, "emitted_at": 1647294277000}}
|
||||
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-08", "stock_ticker": "TSLA", "price": 824.4}, "emitted_at": 1647294277000}}
|
||||
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-09", "stock_ticker": "TSLA", "price": 858.97}, "emitted_at": 1647294277000}}
|
||||
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-10", "stock_ticker": "TSLA", "price": 838.3}, "emitted_at": 1647294277000}}
|
||||
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-11", "stock_ticker": "TSLA", "price": 795.35}, "emitted_at": 1647294277000}}
|
||||
{"type": "STATE", "state": {"data": {"stock_prices": "2022-03-11"}}}
|
||||
```
|
||||
|
||||
Notice that the last line of output is the state object. Copy the state object:
|
||||
```json
|
||||
{"stock_prices": "2022-03-11"}
|
||||
```
|
||||
and paste it into a new file (i.e. `state.json`). Now you can run an incremental sync:
|
||||
```bash
|
||||
python source.py read --config secrets/valid_config.json --catalog incremental_configured_catalog.json --state state.json
|
||||
```
|
||||
|
||||
That's all you need to do to add incremental functionality to the stock ticker Source.
|
||||
|
||||
You can deploy the new version of your connector simply by running:
|
||||
```bash
|
||||
./gradlew clean :airbyte-integrations:connectors:source-stock-ticker-api:build
|
||||
```
|
||||
|
||||
Bonus points: go to Airbyte UI and reconfigure the connection to use incremental sync.
|
||||
|
||||
Incremental definitely requires more configurability than full refresh, so your implementation may deviate slightly depending on whether your cursor
|
||||
field is source defined or user-defined. If you think you are running into one of those cases, check out
|
||||
our [incremental](../../understanding-airbyte/connections/incremental-append.md) documentation for more information on different types of
|
||||
configuration.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user