Co-authored-by: Danylo Jablonski <150933663+DanyloGL@users.noreply.github.com> Co-authored-by: Augustin <augustin@airbyte.io> Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com> Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
67 lines
2.3 KiB
Python
67 lines
2.3 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import base64
|
|
|
|
import requests
|
|
from bigquery_schema_generator.generate_schema import SchemaGenerator
|
|
from gbqschema_converter.gbqschema_to_jsonschema import json_representation as converter
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.packages.urllib3.util.retry import Retry
|
|
|
|
|
|
class Helpers(object):
|
|
@staticmethod
|
|
def _base64_encode(string: str) -> str:
|
|
return base64.b64encode(string.encode("ascii")).decode("ascii")
|
|
|
|
@staticmethod
|
|
def call_survey_cto(config, form_id):
|
|
server_name = config["server_name"]
|
|
start_date = config["start_date"]
|
|
user_name_password = f"{config['username']}:{config['password']}"
|
|
auth_token = Helpers._base64_encode(user_name_password)
|
|
|
|
url = f"https://{server_name}.surveycto.com/" + f"api/v2/forms/data/wide/json/{form_id}?date={start_date}"
|
|
|
|
retry_strategy = Retry(total=3, status_forcelist=[429, 409], method_whitelist=["HEAD", "GET", "OPTIONS"])
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
http = requests.Session()
|
|
http.mount("https://", adapter)
|
|
http.mount("http://", adapter) # ignore-https-check
|
|
|
|
response = http.get(url, headers={"Authorization": "Basic " + auth_token})
|
|
response_json = response.json()
|
|
|
|
if response.status_code != 200 and response_json["error"]:
|
|
message = response_json["error"]["message"]
|
|
raise Exception(message)
|
|
|
|
for data in response_json:
|
|
try:
|
|
yield data
|
|
except Exception as e:
|
|
raise e
|
|
|
|
return data
|
|
|
|
@staticmethod
|
|
def get_filter_data(data):
|
|
generator = SchemaGenerator(input_format="dict", infer_mode="NULLABLE", preserve_input_sort_order="true")
|
|
|
|
schema_map, error_logs = generator.deduce_schema(input_data=data)
|
|
schema = generator.flatten_schema(schema_map)
|
|
schema_json = converter(schema)
|
|
schema = schema_json["definitions"]["element"]["properties"]
|
|
return schema
|
|
|
|
@staticmethod
|
|
def get_json_schema(schema):
|
|
json_schema = {
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"type": "object",
|
|
"properties": schema,
|
|
}
|
|
return json_schema
|