1
0
mirror of synced 2025-12-23 21:03:15 -05:00

Source S3: Convert to airbyte-lib (#33937)

This commit is contained in:
Joe Reuter
2024-01-09 11:31:00 +01:00
committed by GitHub
parent e7ff2a10ab
commit 2edcfb36fb
6 changed files with 54 additions and 38 deletions

View File

@@ -0,0 +1,42 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import sys
import traceback
from datetime import datetime
from typing import List
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type
from source_s3.v4 import Config, Cursor, SourceS3, SourceS3StreamReader
def get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
try:
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
except Exception:
print(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=int(datetime.now().timestamp() * 1000),
error=AirbyteErrorTraceMessage(
message="Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance.",
stack_trace=traceback.format_exc(),
),
),
).json()
)
return None
def run():
_args = sys.argv[1:]
source = get_source(_args)
if source:
launch(source, _args)