1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-looker/source_looker/source.py
2024-12-18 14:05:43 -08:00

29 lines
822 B
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, List, Mapping
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams.core import Stream
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
WARNING: Do not modify this file.
"""
# Declarative Source
class SourceLooker(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "manifest.yaml"})
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
streams = super().streams(config)
if not config.get("run_look_ids"):
return [stream for stream in streams if stream.name != "run_looks"]
return streams