1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-recharge/source_recharge/source.py
2024-12-18 14:05:43 -08:00

30 lines
913 B
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, List, Mapping
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from source_recharge.streams import Orders, RechargeTokenAuthenticator
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
WARNING: Do not modify this file.
"""
# Declarative Source
class SourceRecharge(YamlDeclarativeSource):
def __init__(self) -> None:
super().__init__(**{"path_to_yaml": "manifest.yaml"})
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = RechargeTokenAuthenticator(token=config["access_token"])
streams = super().streams(config=config)
streams.append(Orders(config, authenticator=auth))
return streams