1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Protocol: make supported_sync_modes a required not empty list on AirbyteStream (#15591)

This commit is contained in:
Augustin
2022-10-19 15:22:25 +02:00
committed by GitHub
parent 4b0d1d4ac6
commit b564f3eb78
54 changed files with 307 additions and 153 deletions

View File

@@ -34,5 +34,5 @@ COPY source_azure_table ./source_azure_table
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-azure-table

View File

@@ -2,7 +2,6 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import copy
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple
from airbyte_cdk.logger import AirbyteLogger
@@ -10,13 +9,13 @@ from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
Status,
Type,
SyncMode,
)
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.utils.schema_helpers import split_config
from airbyte_cdk.utils.event_timing import create_timer
@@ -34,14 +33,6 @@ class SourceAzureTable(AbstractSource):
def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
return data
def _checkpoint_state(self, stream, stream_state, connector_state):
try:
connector_state[stream.name] = stream.state
except AttributeError:
connector_state[stream.name] = stream_state
return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=connector_state))
@property
def get_typed_schema(self) -> object:
"""Static schema for tables"""
@@ -71,10 +62,13 @@ class SourceAzureTable(AbstractSource):
streams = []
for table in tables:
stream_name = table.name
stream = AirbyteStream(name=stream_name, json_schema=self.get_typed_schema)
stream.supported_sync_modes = ["full_refresh", "incremental"]
stream.source_defined_cursor = True
stream.default_cursor_field = ["PartitionKey"]
stream = AirbyteStream(
name=stream_name,
json_schema=self.get_typed_schema,
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
source_defined_cursor=True,
default_cursor_field=["PartitionKey"],
)
streams.append(stream)
logger.info(f"Total {streams.count} streams found.")
@@ -106,10 +100,10 @@ class SourceAzureTable(AbstractSource):
"""
This method is overridden to check whether the stream `quotes` exists in the source, if not skip reading that stream.
"""
connector_state = copy.deepcopy(state or {})
stream_instances = {s.name: s for s in self.streams(logger=logger, config=config)}
state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
stream_instances = {s.name: s for s in self.streams(logger=logger, config=config)}
self._stream_to_instance_map = stream_instances
with create_timer(self.name) as timer:
for configured_stream in catalog.streams:
@@ -128,7 +122,7 @@ class SourceAzureTable(AbstractSource):
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
connector_state=connector_state,
state_manager=state_manager,
internal_config=internal_config,
)
except Exception as e:

View File

@@ -5,7 +5,6 @@
"title": "Azure Data Table Spec",
"type": "object",
"required": ["storage_account_name", "storage_access_key"],
"additionalProperties": false,
"properties": {
"storage_account_name": {
"title": "Account Name",

View File

@@ -6,7 +6,7 @@ import logging
from unittest import mock
import pytest
from airbyte_cdk.models import AirbyteCatalog
from airbyte_cdk.models import AirbyteCatalog, SyncMode
from source_azure_table.source import SourceAzureTable
from source_azure_table.streams import AzureTableStream
@@ -49,7 +49,7 @@ def test_discover(mocker, config, tables):
"type": "object",
"properties": {"PartitionKey": {"type": "string"}},
}
assert stream.supported_sync_modes == ["full_refresh", "incremental"]
assert stream.supported_sync_modes == [SyncMode.full_refresh, SyncMode.incremental]
assert stream.source_defined_cursor is True
assert stream.default_cursor_field == ["PartitionKey"]