1
0
mirror of synced 2025-12-25 11:06:55 -05:00

Add Airbyte-native facebook marketing source (#1552)

Co-authored-by: Jared Rhizor <me@jaredrhizor.com>
Co-authored-by: Sherif Nada <snadalive@gmail.com>
This commit is contained in:
Eugene K
2021-01-15 03:20:41 -03:00
committed by GitHub
parent f3735280de
commit fb7262dfa8
46 changed files with 4551 additions and 23 deletions

View File

@@ -24,12 +24,15 @@ SOFTWARE.
import inspect
import json
import os
import pkgutil
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict, Generator, Tuple
from typing import Dict, Generator, List, Tuple
import pkg_resources
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream
from jsonschema import RefResolver
def package_name_from_class(cls: object) -> str:
@@ -38,6 +41,65 @@ def package_name_from_class(cls: object) -> str:
return module.__name__.split(".")[0]
class JsonSchemaResolver:
"""Helper class to expand $ref items in json schema"""
def __init__(self, shared_schemas_path: str):
self._shared_refs = self._load_shared_schema_refs(shared_schemas_path)
@staticmethod
def _load_shared_schema_refs(path: str):
shared_file_names = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
shared_schema_refs = {}
for shared_file in shared_file_names:
with open(os.path.join(path, shared_file)) as data_file:
shared_schema_refs[shared_file] = json.load(data_file)
return shared_schema_refs
def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict:
if "$ref" in schema:
reference_path = schema.pop("$ref", None)
resolved = resolver.resolve(reference_path)[1]
schema.update(resolved)
return self._resolve_schema_references(schema, resolver)
if "properties" in schema:
for k, val in schema["properties"].items():
schema["properties"][k] = self._resolve_schema_references(val, resolver)
if "patternProperties" in schema:
for k, val in schema["patternProperties"].items():
schema["patternProperties"][k] = self._resolve_schema_references(val, resolver)
if "items" in schema:
schema["items"] = self._resolve_schema_references(schema["items"], resolver)
if "anyOf" in schema:
for i, element in enumerate(schema["anyOf"]):
schema["anyOf"][i] = self._resolve_schema_references(element, resolver)
return schema
def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict:
"""Resolves and replaces json-schema $refs with the appropriate dict.
Recursively walks the given schema dict, converting every instance
of $ref in a 'properties' structure with a resolved dict.
This modifies the input schema and also returns it.
Arguments:
schema:
the schema dict
refs:
a dict of <string, dict> which forms a store of referenced schemata
Returns:
schema
"""
refs = refs or {}
refs = {**self._shared_refs, **refs}
return self._resolve_schema_references(schema, RefResolver("", schema, store=refs))
class ResourceSchemaLoader:
"""JSONSchema loader from package resources"""
@@ -46,6 +108,9 @@ class ResourceSchemaLoader:
def get_schema(self, name: str) -> dict:
raw_schema = json.loads(pkgutil.get_data(self.package_name, f"schemas/{name}.json"))
shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/")
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema
@@ -70,13 +135,19 @@ class BaseClient(ABC):
return mapping
@staticmethod
def _get_fields_from_stream(stream: AirbyteStream) -> List[str]:
return list(stream.json_schema.get("properties", {}).keys())
def read_stream(self, stream: AirbyteStream) -> Generator[AirbyteRecordMessage, None, None]:
"""Yield records from stream"""
method = self._stream_methods.get(stream.name)
if not method:
raise ValueError(f"Client does not know how to read stream `{stream.name}`")
for message in method():
fields = self._get_fields_from_stream(stream)
for message in method(fields=fields):
now = int(datetime.now().timestamp()) * 1000
yield AirbyteRecordMessage(stream=stream.name, data=message, emitted_at=now)