53 lines
1.8 KiB
Python
53 lines
1.8 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from copy import deepcopy
|
|
from typing import Any, List, Mapping, Tuple
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.sources import AbstractSource
|
|
from airbyte_cdk.sources.streams import Stream
|
|
|
|
from .stream import KYVEStream
|
|
|
|
|
|
class SourceKyve(AbstractSource):
|
|
def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
|
|
# check that pools and bundles are the same length
|
|
pools = config.get("pool_ids").split(",")
|
|
start_ids = config.get("start_ids").split(",")
|
|
|
|
if not len(pools) == len(start_ids):
|
|
return False, "Please add a start_id for every pool"
|
|
|
|
for pool_id in pools:
|
|
try:
|
|
# check if endpoint is available and returns valid data
|
|
response = requests.get(f"{config['url_base']}/kyve/query/v1beta1/pool/{pool_id}")
|
|
if not response.ok:
|
|
# todo improve error handling for cases like pool not found
|
|
return False, response.json()
|
|
except Exception as e:
|
|
return False, e
|
|
|
|
return True, None
|
|
|
|
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
|
streams: List[Stream] = []
|
|
|
|
pools = config.get("pool_ids").split(",")
|
|
start_ids = config.get("start_ids").split(",")
|
|
|
|
for pool_id, start_id in zip(pools, start_ids):
|
|
response = requests.get(f"{config['url_base']}/kyve/query/v1beta1/pool/{pool_id}")
|
|
pool_data = response.json().get("pool").get("data")
|
|
|
|
config_copy = dict(deepcopy(config))
|
|
config_copy["start_ids"] = int(start_id)
|
|
# add a new stream based on the pool_data
|
|
streams.append(KYVEStream(config=config_copy, pool_data=pool_data))
|
|
|
|
return streams
|