# MIT License # # Copyright (c) 2020 Airbyte # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import argparse # helps parse commandline arguments import datetime import json import os import sys from datetime import date, timedelta import requests def read(config, catalog): # Assert required configuration was provided if "api_key" not in config or "stock_ticker" not in config: log_error("Input config must contain the properties 'api_key' and 'stock_ticker'") sys.exit(1) # Find the stock_prices stream if it is present in the input catalog stock_prices_stream = None for configured_stream in catalog["streams"]: if configured_stream["stream"]["name"] == "stock_prices": stock_prices_stream = configured_stream if stock_prices_stream is None: log_error("No streams selected") return # We only support full_refresh at the moment, so verify the user didn't ask for another sync mode if stock_prices_stream["sync_mode"] != "full_refresh": log_error("This connector only supports full refresh syncs! (for now)") sys.exit(1) # If we've made it this far, all the configuration is good and we can pull the last 7 days of market data response = _call_api(ticker=config["stock_ticker"], token=config["api_key"]) if response.status_code != 200: # In a real scenario we'd handle this error better :) log_error("Failure occurred when calling Polygon.io API") sys.exit(1) else: # Stock prices are returned sorted by date in ascending order # We want to output them one by one as AirbyteMessages results = response.json()["results"] for result in results: data = { "date": date.fromtimestamp(result["t"] / 1000).isoformat(), "stock_ticker": config["stock_ticker"], "price": result["c"], } record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.datetime.now().timestamp()) * 1000} output_message = {"type": "RECORD", "record": record} print(json.dumps(output_message)) def read_json(filepath): with open(filepath, "r") as f: return json.loads(f.read()) def _call_api(ticker, token): today = date.today() to_day = today.strftime("%Y-%m-%d") from_day = (today - timedelta(days=7)).strftime("%Y-%m-%d") return requests.get(f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/day/{from_day}/{to_day}?sort=asc&limit=120&apiKey={token}") def check(config): # Assert required configuration was provided if "api_key" not in config or "stock_ticker" not in config: log_error("Input config must contain the properties 'api_key' and 'stock_ticker'") sys.exit(1) else: # Validate input configuration by attempting to get the daily closing prices of the input stock ticker response = _call_api(ticker=config["stock_ticker"], token=config["api_key"]) if response.status_code == 200: result = {"status": "SUCCEEDED"} elif response.status_code == 403: # HTTP code 403 means authorization failed so the API key is incorrect result = {"status": "FAILED", "message": "API Key is incorrect."} else: # Consider any other code a "generic" failure and tell the user to make sure their config is correct. result = {"status": "FAILED", "message": "Input configuration is incorrect. Please verify the input stock ticker and API key."} # Format the result of the check operation according to the Airbyte Specification output_message = {"type": "CONNECTION_STATUS", "connectionStatus": result} print(json.dumps(output_message)) def log(message): log_json = {"type": "LOG", "log": message} print(json.dumps(log_json)) def log_error(error_message): current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000 log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}} print(json.dumps(log_json)) def discover(): catalog = { "streams": [ { "name": "stock_prices", "supported_sync_modes": ["full_refresh"], "json_schema": { "properties": {"date": {"type": "string"}, "price": {"type": "number"}, "stock_ticker": {"type": "string"}} }, } ] } airbyte_message = {"type": "CATALOG", "catalog": catalog} print(json.dumps(airbyte_message)) def get_input_file_path(path): if os.path.isabs(path): return path else: return os.path.join(os.getcwd(), path) def spec(): # Read the file named spec.json from the module directory as a JSON file current_script_directory = os.path.dirname(os.path.realpath(__file__)) spec_path = os.path.join(current_script_directory, "spec.json") specification = read_json(spec_path) # form an Airbyte Message containing the spec and print it to stdout airbyte_message = {"type": "SPEC", "spec": specification} # json.dumps converts the JSON (Python dict) to a string print(json.dumps(airbyte_message)) def run(args): parent_parser = argparse.ArgumentParser(add_help=False) main_parser = argparse.ArgumentParser() subparsers = main_parser.add_subparsers(title="commands", dest="command") # Accept the spec command subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser]) # Accept the check command check_parser = subparsers.add_parser("check", help="checks the config used to connect", parents=[parent_parser]) required_check_parser = check_parser.add_argument_group("required named arguments") required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file") # Accept the discover command discover_parser = subparsers.add_parser("discover", help="outputs a catalog describing the source's schema", parents=[parent_parser]) required_discover_parser = discover_parser.add_argument_group("required named arguments") required_discover_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file") # Accept the read command read_parser = subparsers.add_parser("read", help="reads the source and outputs messages to STDOUT", parents=[parent_parser]) read_parser.add_argument("--state", type=str, required=False, help="path to the json-encoded state file") required_read_parser = read_parser.add_argument_group("required named arguments") required_read_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file") required_read_parser.add_argument("--catalog", type=str, required=True, help="path to the catalog used to determine which data to read") parsed_args = main_parser.parse_args(args) command = parsed_args.command if command == "spec": spec() elif command == "check": config_file_path = get_input_file_path(parsed_args.config) config = read_json(config_file_path) check(config) elif command == "discover": discover() elif command == "read": config = read_json(get_input_file_path(parsed_args.config)) configured_catalog = read_json(get_input_file_path(parsed_args.catalog)) read(config, configured_catalog) else: # If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process # had a failure log("Invalid command. Allowable commands: [spec, check, discover, read]") sys.exit(1) # A zero exit code means the process successfully completed sys.exit(0) def main(): arguments = sys.argv[1:] run(arguments) if __name__ == "__main__": main()