🏥 Source Connectors: Pin airbyte-cdk version to ^0 (#36267)
This commit is contained in:
@@ -173,11 +173,11 @@ class SourceFile(Source):
|
||||
fields = self.selected_fields(catalog, config)
|
||||
name = client.stream_name
|
||||
|
||||
configured_stream = catalog.streams[0]
|
||||
airbyte_stream = catalog.streams[0].stream
|
||||
|
||||
logger.info(f"Syncing stream: {name} ({client.reader.full_url})...")
|
||||
|
||||
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED)
|
||||
yield stream_status_as_airbyte_message(airbyte_stream, AirbyteStreamStatus.STARTED)
|
||||
|
||||
record_counter = 0
|
||||
try:
|
||||
@@ -187,19 +187,19 @@ class SourceFile(Source):
|
||||
record_counter += 1
|
||||
if record_counter == 1:
|
||||
logger.info(f"Marking stream {name} as RUNNING")
|
||||
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)
|
||||
yield stream_status_as_airbyte_message(airbyte_stream, AirbyteStreamStatus.RUNNING)
|
||||
|
||||
yield AirbyteMessage(type=Type.RECORD, record=record)
|
||||
|
||||
logger.info(f"Marking stream {name} as STOPPED")
|
||||
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE)
|
||||
yield stream_status_as_airbyte_message(airbyte_stream, AirbyteStreamStatus.COMPLETE)
|
||||
|
||||
except Exception as err:
|
||||
reason = f"Failed to read data of {name} at {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
|
||||
logger.error(reason)
|
||||
logger.exception(f"Encountered an exception while reading stream {name}")
|
||||
logger.info(f"Marking stream {name} as STOPPED")
|
||||
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
|
||||
yield stream_status_as_airbyte_message(airbyte_stream, AirbyteStreamStatus.INCOMPLETE)
|
||||
raise err
|
||||
|
||||
@staticmethod
|
||||
|
||||
Reference in New Issue
Block a user