1
0
mirror of synced 2025-12-20 18:39:31 -05:00
Files
airbyte/airbyte-integrations/connectors/source-adjust/source_adjust/components.py
2024-12-18 14:05:43 -08:00

34 lines
980 B
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, Mapping
import source_adjust.model
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
@dataclass
class AdjustSchemaLoader(JsonFileSchemaLoader):
config: Mapping[str, Any]
def get_json_schema(self) -> Mapping[str, Any]:
"""
Prune the schema to only include selected fields to synchronize.
"""
schema = source_adjust.model.Report.schema()
properties = schema["properties"]
required = schema["required"]
selected = self.config["metrics"] + self.config["dimensions"]
retain = required + selected
for attr in list(properties.keys()):
if attr not in retain:
del properties[attr]
for attr in self.config["additional_metrics"]:
properties[attr] = {"type": "number"}
return schema