1
0
mirror of synced 2026-01-31 19:01:59 -05:00
Files
airbyte/airbyte-cdk/python/connector_builder/connector_builder_handler.py
Catherine Noll 8ee32b1132 New connector_builder module for handling requests from the Connector Builder (#23888)
Also implements `resolve_manifest` handler
2023-03-14 13:51:27 -04:00

37 lines
1.0 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from datetime import datetime
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, Type
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
def list_streams() -> AirbyteMessage:
raise NotImplementedError
def stream_read() -> AirbyteMessage:
raise NotImplementedError
def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
try:
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
data={"manifest": source.resolved_manifest},
emitted_at=_emitted_at(),
stream="resolve_manifest",
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message="Error resolving manifest.")
return error.as_airbyte_message()
def _emitted_at():
return int(datetime.now().timestamp()) * 1000