1
0
mirror of synced 2025-12-20 10:32:35 -05:00
Files
airbyte/airbyte-integrations/connectors/source-stock-ticker-api-tutorial/source.py
2023-08-25 10:20:41 -07:00

213 lines
8.8 KiB
Python

# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse # helps parse commandline arguments
import datetime
import json
import os
import sys
from datetime import date, timedelta
import requests
def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
# Find the stock_prices stream if it is present in the input catalog
stock_prices_stream = None
for configured_stream in catalog["streams"]:
if configured_stream["stream"]["name"] == "stock_prices":
stock_prices_stream = configured_stream
if stock_prices_stream is None:
log_error("No streams selected")
return
# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)
# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token=config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by date in ascending order
# We want to output them one by one as AirbyteMessages
results = response.json()["results"]
for result in results:
data = {
"date": date.fromtimestamp(result["t"] / 1000).isoformat(),
"stock_ticker": config["stock_ticker"],
"price": result["c"],
}
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.datetime.now().timestamp()) * 1000}
output_message = {"type": "RECORD", "record": record}
print(json.dumps(output_message))
def read_json(filepath):
with open(filepath, "r") as f:
return json.loads(f.read())
def _call_api(ticker, token):
today = date.today()
to_day = today.strftime("%Y-%m-%d")
from_day = (today - timedelta(days=7)).strftime("%Y-%m-%d")
return requests.get(f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/day/{from_day}/{to_day}?sort=asc&limit=120&apiKey={token}")
def check(config):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
else:
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
response = _call_api(ticker=config["stock_ticker"], token=config["api_key"])
if response.status_code == 200:
result = {"status": "SUCCEEDED"}
elif response.status_code == 403:
# HTTP code 403 means authorization failed so the API key is incorrect
result = {"status": "FAILED", "message": "API Key is incorrect."}
else:
# Consider any other code a "generic" failure and tell the user to make sure their config is correct.
result = {"status": "FAILED", "message": "Input configuration is incorrect. Please verify the input stock ticker and API key."}
# Format the result of the check operation according to the Airbyte Specification
output_message = {"type": "CONNECTION_STATUS", "connectionStatus": result}
print(json.dumps(output_message))
def log(message):
log_json = {"type": "LOG", "log": message}
print(json.dumps(log_json))
def log_error(error_message):
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))
def discover():
catalog = {
"streams": [
{
"name": "stock_prices",
"supported_sync_modes": ["full_refresh"],
"json_schema": {
"properties": {"date": {"type": "string"}, "price": {"type": "number"}, "stock_ticker": {"type": "string"}}
},
}
]
}
airbyte_message = {"type": "CATALOG", "catalog": catalog}
print(json.dumps(airbyte_message))
def get_input_file_path(path):
if os.path.isabs(path):
return path
else:
return os.path.join(os.getcwd(), path)
def spec():
# Read the file named spec.json from the module directory as a JSON file
current_script_directory = os.path.dirname(os.path.realpath(__file__))
spec_path = os.path.join(current_script_directory, "spec.json")
specification = read_json(spec_path)
# form an Airbyte Message containing the spec and print it to stdout
airbyte_message = {"type": "SPEC", "spec": specification}
# json.dumps converts the JSON (Python dict) to a string
print(json.dumps(airbyte_message))
def run(args):
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")
# Accept the spec command
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])
# Accept the check command
check_parser = subparsers.add_parser("check", help="checks the config used to connect", parents=[parent_parser])
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# Accept the discover command
discover_parser = subparsers.add_parser("discover", help="outputs a catalog describing the source's schema", parents=[parent_parser])
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# Accept the read command
read_parser = subparsers.add_parser("read", help="reads the source and outputs messages to STDOUT", parents=[parent_parser])
read_parser.add_argument("--state", type=str, required=False, help="path to the json-encoded state file")
required_read_parser = read_parser.add_argument_group("required named arguments")
required_read_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
required_read_parser.add_argument("--catalog", type=str, required=True, help="path to the catalog used to determine which data to read")
parsed_args = main_parser.parse_args(args)
command = parsed_args.command
if command == "spec":
spec()
elif command == "check":
config_file_path = get_input_file_path(parsed_args.config)
config = read_json(config_file_path)
check(config)
elif command == "discover":
discover()
elif command == "read":
config = read_json(get_input_file_path(parsed_args.config))
configured_catalog = read_json(get_input_file_path(parsed_args.catalog))
read(config, configured_catalog)
else:
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
# had a failure
log("Invalid command. Allowable commands: [spec, check, discover, read]")
sys.exit(1)
# A zero exit code means the process successfully completed
sys.exit(0)
def main():
arguments = sys.argv[1:]
run(arguments)
if __name__ == "__main__":
main()