1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🐛 Source Shopify: fix duplicates for Product Images, Metafield Product Images and Metafield Products streams for Incremental syncs (#46095)

This commit is contained in:
Baz
2024-10-01 12:29:43 +03:00
committed by GitHub
parent 4b3533ad67
commit f517c3bac7
10 changed files with 190 additions and 40 deletions

View File

@@ -24,10 +24,6 @@ acceptance_tests:
discovery:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
# specified the Type for `customer_journey_summary` field,
# for `customer_journey_summary` stream.
disable_for_version: 2.14.17
basic_read:
tests:
- config_path: "secrets/config_transactions_with_user_id.json"

View File

@@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.5.2
dockerImageTag: 2.5.3
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
erdUrl: https://dbdocs.io/airbyteio/source-shopify?view=relationships

View File

@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
version = "2.5.2"
version = "2.5.3"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]

View File

@@ -80,6 +80,17 @@ class ShopifyBulkTemplates:
@dataclass
class ShopifyBulkQuery:
config: Mapping[str, Any]
parent_stream_name: Optional[str] = None
parent_stream_cursor: Optional[str] = None
@property
def has_parent_stream(self) -> bool:
return True if self.parent_stream_name and self.parent_stream_cursor else False
@property
def parent_cursor_key(self) -> Optional[str]:
if self.has_parent_stream:
return f"{self.parent_stream_name}_{self.parent_stream_cursor}"
@property
def shop_id(self) -> int:
@@ -132,6 +143,38 @@ class ShopifyBulkQuery:
"""
return ["__typename", "id"]
def _inject_parent_cursor_field(self, nodes: List[Field], key: str = "updatedAt", index: int = 2) -> List[Field]:
if self.has_parent_stream:
# inject parent cursor key as alias to the `updatedAt` parent cursor field
nodes.insert(index, Field(name="updatedAt", alias=self.parent_cursor_key))
return nodes
def _add_parent_record_state(self, record: MutableMapping[str, Any], items: List[dict], to_rfc3339: bool = False) -> List[dict]:
"""
Adds a parent cursor value to each item in the list.
This method iterates over a list of dictionaries and adds a new key-value pair to each dictionary.
The key is the value of `self.query_name`, and the value is another dictionary with a single key "updated_at"
and the provided `parent_cursor_value`.
Args:
items (List[dict]): A list of dictionaries to which the parent cursor value will be added.
parent_cursor_value (str): The value to be set for the "updated_at" key in the nested dictionary.
Returns:
List[dict]: The modified list of dictionaries with the added parent cursor values.
"""
if self.has_parent_stream:
parent_cursor_value: Optional[str] = record.get(self.parent_cursor_key, None)
parent_state = self.tools._datetime_str_to_rfc3339(parent_cursor_value) if to_rfc3339 and parent_cursor_value else None
for item in items:
item[self.parent_stream_name] = {self.parent_stream_cursor: parent_state}
return items
def get(self, filter_field: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None) -> str:
# define filter query string, if passed
filter_query = f"{filter_field}:>='{start}' AND {filter_field}:<='{end}'" if filter_field else None
@@ -285,15 +328,22 @@ class Metafield(ShopifyBulkQuery):
List of available fields:
https://shopify.dev/docs/api/admin-graphql/unstable/objects/Metafield
"""
nodes = super().query_nodes
# define metafield node
metafield_node = self.get_edge_node("metafields", self.metafield_fields)
if isinstance(self.type.value, list):
return ["__typename", "id", self.get_edge_node(self.type.value[1], ["__typename", "id", metafield_node])]
nodes = [*nodes, self.get_edge_node(self.type.value[1], [*nodes, metafield_node])]
elif isinstance(self.type.value, str):
return ["__typename", "id", metafield_node]
nodes = [*nodes, metafield_node]
def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]:
nodes = self._inject_parent_cursor_field(nodes)
return nodes
def _process_metafield(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
# resolve parent id from `str` to `int`
record["owner_id"] = self.tools.resolve_str_id(record.get(BULK_PARENT_KEY))
# add `owner_resource` field
@@ -304,7 +354,28 @@ class Metafield(ShopifyBulkQuery):
record["createdAt"] = self.tools.from_iso8601_to_rfc3339(record, "createdAt")
record["updatedAt"] = self.tools.from_iso8601_to_rfc3339(record, "updatedAt")
record = self.tools.fields_names_to_snake_case(record)
yield record
return record
def _process_components(self, entity: List[dict]) -> Iterable[MutableMapping[str, Any]]:
for item in entity:
# resolve the id from string
item["admin_graphql_api_id"] = item.get("id")
item["id"] = self.tools.resolve_str_id(item.get("id"))
yield self._process_metafield(item)
def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]:
# get the joined record components collected for the record
record_components = record.get("record_components", {})
# process record components
if not record_components:
yield self._process_metafield(record)
else:
metafields = record_components.get("Metafield", [])
if len(metafields) > 0:
if self.has_parent_stream:
# add parent state to each metafield
metafields = self._add_parent_record_state(record, metafields, to_rfc3339=True)
yield from self._process_components(metafields)
class MetafieldCollection(Metafield):
@@ -343,7 +414,9 @@ class MetafieldCustomer(Metafield):
customers(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) {
edges {
node {
__typename
id
customer_updated_at: updatedAt
metafields {
edges {
node {
@@ -366,6 +439,11 @@ class MetafieldCustomer(Metafield):
type = MetafieldType.CUSTOMERS
record_composition = {
"new_record": "Customer",
"record_components": ["Metafield"],
}
class MetafieldLocation(Metafield):
"""
@@ -464,7 +542,9 @@ class MetafieldProduct(Metafield):
products(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) {
edges {
node {
__typename
id
product_updated_at: updatedAt
metafields {
edges {
node {
@@ -487,6 +567,11 @@ class MetafieldProduct(Metafield):
type = MetafieldType.PRODUCTS
record_composition = {
"new_record": "Product",
"record_components": ["Metafield"],
}
class MetafieldProductImage(Metafield):
"""
@@ -496,6 +581,7 @@ class MetafieldProductImage(Metafield):
node {
__typename
id
product_updated_at: updatedAt
media {
edges {
node {
@@ -527,6 +613,13 @@ class MetafieldProductImage(Metafield):
}
"""
type = MetafieldType.PRODUCT_IMAGES
record_composition = {
"new_record": "Product",
"record_components": ["Metafield"],
}
@property
def query_nodes(self) -> List[Field]:
"""
@@ -537,19 +630,16 @@ class MetafieldProductImage(Metafield):
More info here:
https://shopify.dev/docs/api/release-notes/2024-04#productimage-value-removed
"""
# define metafield node
metafield_node = self.get_edge_node("metafields", self.metafield_fields)
media_fields: List[Field] = [
"__typename",
"id",
InlineFragment(type="MediaImage", fields=[metafield_node]),
]
# define media node
media_fields: List[Field] = ["__typename", "id", InlineFragment(type="MediaImage", fields=[metafield_node])]
media_node = self.get_edge_node("media", media_fields)
fields: List[Field] = ["__typename", "id", media_node]
return fields
type = MetafieldType.PRODUCT_IMAGES
fields: List[Field] = ["__typename", "id", media_node]
fields = self._inject_parent_cursor_field(fields)
return fields
class MetafieldProductVariant(Metafield):
@@ -2238,6 +2328,7 @@ class ProductImage(ShopifyBulkQuery):
node {
__typename
id
products_updated_at: updatedAt
# THE MEDIA NODE IS NEEDED TO PROVIDE THE CURSORS
media {
edges {
@@ -2314,8 +2405,7 @@ class ProductImage(ShopifyBulkQuery):
# media property fields
media_fields: List[Field] = [Field(name="edges", fields=[Field(name="node", fields=media_fragment)])]
# main query
query_nodes: List[Field] = [
nodes: List[Field] = [
"__typename",
"id",
Field(name="media", fields=media_fields),
@@ -2330,6 +2420,10 @@ class ProductImage(ShopifyBulkQuery):
"record_components": ["MediaImage", "Image"],
}
@property
def query_nodes(self) -> List[Field]:
return self._inject_parent_cursor_field(self.nodes)
def _process_component(self, entity: List[dict]) -> List[dict]:
for item in entity:
# remove the `__parentId` from the object
@@ -2405,6 +2499,8 @@ class ProductImage(ShopifyBulkQuery):
# add the product_id to each `Image`
record["images"] = self._add_product_id(record.get("images", []), record.get("id"))
# add the product cursor to each `Image`
record["images"] = self._add_parent_record_state(record, record.get("images", []), to_rfc3339=True)
record["images"] = self._merge_with_media(record_components)
record.pop("record_components")
@@ -2413,7 +2509,6 @@ class ProductImage(ShopifyBulkQuery):
if len(images) > 0:
# convert dates from ISO-8601 to RFC-3339
record["images"] = self._convert_datetime_to_rfc3339(images)
yield from self._emit_complete_records(images)

View File

@@ -63,7 +63,11 @@ class BulkTools:
return url
@staticmethod
def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[str, Any]:
def _datetime_str_to_rfc3339(value: str) -> str:
return pdm.parse(value).to_rfc3339_string()
@staticmethod
def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Optional[str]:
"""
Converts date-time as follows:
Input: "2023-01-01T15:00:00Z"
@@ -73,7 +77,7 @@ class BulkTools:
# some fields that expected to be resolved as ids, might not be populated for the particular `RECORD`,
# we should return `None` to make the field `null` in the output as the result of the transformation.
target_value = record.get(field)
return pdm.parse(target_value).to_rfc3339_string() if target_value else record.get(field)
return BulkTools._datetime_str_to_rfc3339(target_value) if target_value else record.get(field)
def fields_names_to_snake_case(self, dict_input: Optional[Mapping[str, Any]] = None) -> Optional[MutableMapping[str, Any]]:
# transforming record field names from camel to snake case, leaving the `__parent_id` relation in place

View File

@@ -644,7 +644,7 @@ class IncrementalShopifyGraphQlBulkStream(IncrementalShopifyStream):
self.job_manager: ShopifyBulkManager = ShopifyBulkManager(
http_client=self.bulk_http_client,
base_url=f"{self.url_base}{self.path()}",
query=self.bulk_query(config),
query=self.bulk_query(config, self.parent_stream_name, self.parent_stream_cursor),
job_termination_threshold=float(config.get("job_termination_threshold", 3600)),
# overide the default job slice size, if provided (it's auto-adjusted, later on)
job_size=config.get("bulk_window_in_days", 30.0),
@@ -670,6 +670,20 @@ class IncrementalShopifyGraphQlBulkStream(IncrementalShopifyStream):
"""
return self.parent_stream_class(self.config) if self.parent_stream_class else None
@property
def parent_stream_name(self) -> Optional[str]:
"""
Returns the parent stream name, if the substream has a `parent_stream_class` dependency.
"""
return self.parent_stream.name if self.parent_stream_class else None
@property
def parent_stream_cursor(self) -> Optional[str]:
"""
Returns the parent stream cursor, if the substream has a `parent_stream_class` dependency.
"""
return self.parent_stream.cursor_field if self.parent_stream_class else None
@property
@abstractmethod
def bulk_query(self) -> ShopifyBulkQuery:
@@ -716,21 +730,37 @@ class IncrementalShopifyGraphQlBulkStream(IncrementalShopifyStream):
"""
updated_state = super().get_updated_state(current_stream_state, latest_record)
if self.parent_stream_class:
parent_state = latest_record.get(self.parent_stream.name, {})
parent_state_value = (
parent_state.get(self.parent_stream.cursor_field) if parent_state else latest_record.get(self.parent_stream.cursor_field)
)
# add parent_stream_state to `updated_state`
updated_state[self.parent_stream.name] = {self.parent_stream.cursor_field: latest_record.get(self.parent_stream.cursor_field)}
updated_state[self.parent_stream.name] = {self.parent_stream.cursor_field: parent_state_value}
return updated_state
def get_stream_state_value(self, stream_state: Optional[Mapping[str, Any]]) -> str:
if self.parent_stream_class:
# get parent stream state from the stream_state object.
parent_state = stream_state.get(self.parent_stream.name, {})
if parent_state:
return parent_state.get(self.parent_stream.cursor_field, self.default_state_comparison_value)
else:
# get the stream state, if no `parent_stream_class` was assigned.
def _get_stream_cursor_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[str]:
if stream_state:
return stream_state.get(self.cursor_field, self.default_state_comparison_value)
else:
return self.config.get("start_date")
def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Union[str, int]]:
def get_stream_state_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[str]:
if stream_state:
if self.parent_stream_class:
# get parent stream state from the stream_state object.
parent_state = stream_state.get(self.parent_stream.name, {})
if parent_state:
return parent_state.get(self.parent_stream.cursor_field, self.default_state_comparison_value)
else:
# use the streams cursor value, if no parent state available
return self._get_stream_cursor_value(stream_state)
else:
# get the stream state, if no `parent_stream_class` was assigned.
return self._get_stream_cursor_value(stream_state)
else:
return self.config.get("start_date")
def get_state_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[Union[str, int]]:
if stream_state:
return self.get_stream_state_value(stream_state)
else:

View File

@@ -76,6 +76,7 @@ class Customers(IncrementalShopifyStream):
class MetafieldCustomers(IncrementalShopifyGraphQlBulkStream):
parent_stream_class = Customers
bulk_query: MetafieldCustomer = MetafieldCustomer
@@ -170,14 +171,17 @@ class ProductsGraphQl(IncrementalShopifyStream):
class MetafieldProducts(IncrementalShopifyGraphQlBulkStream):
parent_stream_class = Products
bulk_query: MetafieldProduct = MetafieldProduct
class ProductImages(IncrementalShopifyGraphQlBulkStream):
parent_stream_class = Products
bulk_query: ProductImage = ProductImage
class MetafieldProductImages(IncrementalShopifyGraphQlBulkStream):
parent_stream_class = Products
bulk_query: MetafieldProductImage = MetafieldProductImage

View File

@@ -947,6 +947,9 @@ def product_images_response_expected_result():
"admin_graphql_api_id": "gid://shopify/ProductImage/111",
"product_id": 123,
"shop_url": "test_shop",
"products": {
"updated_at": None,
},
},
{
"created_at": "2021-06-23T01:09:47+00:00",
@@ -958,6 +961,9 @@ def product_images_response_expected_result():
"width": 2200,
"admin_graphql_api_id": "gid://shopify/ProductImage/222",
"product_id": 456,
"products": {
"updated_at": None,
},
"shop_url": "test_shop",
},
]

View File

@@ -12,6 +12,7 @@ from source_shopify.shopify_graphql.bulk.query import (
ShopifyBulkQuery,
ShopifyBulkTemplates,
)
from source_shopify.streams.streams import Customers, Products
def test_query_status() -> None:
@@ -123,13 +124,14 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start,
@pytest.mark.parametrize(
"query_class, filter_field, start, end, expected",
"query_class, filter_field, start, end, parent_stream_class, expected",
[
(
MetafieldCustomer,
"updated_at",
"2023-01-01",
"2023-01-02",
Customers,
Operation(
type="",
queries=[
@@ -139,7 +141,7 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start,
Argument(name="query", value=f"\"updated_at:>='2023-01-01' AND updated_at:<='2023-01-02'\""),
Argument(name="sortKey", value="UPDATED_AT"),
],
fields=[Field(name='edges', fields=[Field(name='node', fields=['__typename', 'id', Field(name="metafields", fields=[Field(name="edges", fields=[Field(name="node", fields=["__typename", "id", "namespace", "value", "key", "description", "createdAt", "updatedAt", "type"])])])])])]
fields=[Field(name='edges', fields=[Field(name='node', fields=['__typename', 'id', Field(name="updatedAt", alias="customers_updated_at"), Field(name="metafields", fields=[Field(name="edges", fields=[Field(name="node", fields=["__typename", "id", "namespace", "value", "key", "description", "createdAt", "updatedAt", "type"])])])])])]
)
]
),
@@ -149,6 +151,7 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start,
"updated_at",
"2023-01-01",
"2023-01-02",
Products,
Operation(
type="",
queries=[
@@ -167,6 +170,10 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start,
fields=[
"__typename",
"id",
Field(
name="updatedAt",
alias="products_updated_at",
),
Field(
name="media",
fields=[
@@ -227,6 +234,7 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start,
"updated_at",
"2023-01-01",
"2023-01-02",
None,
Operation(
type="",
queries=[
@@ -298,6 +306,11 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start,
"InventoryLevel query",
]
)
def test_bulk_query(basic_config, query_class, filter_field, start, end, expected) -> None:
stream = query_class(basic_config)
assert stream.get(filter_field, start, end) == expected.render()
def test_bulk_query(auth_config, query_class, filter_field, start, end, parent_stream_class, expected) -> None:
if parent_stream_class:
parent_stream = parent_stream_class(auth_config)
stream_query = query_class(auth_config, parent_stream.name, parent_stream.cursor_field)
else:
stream_query = query_class(auth_config)
assert stream_query.get(filter_field, start, end) == expected.render()

View File

@@ -27,6 +27,7 @@ For existing **Airbyte Cloud** customers, if you are currently using the **API P
:::
<HideInUI>
### For Airbyte Cloud:
1. [Log into your Airbyte Cloud](https://cloud.airbyte.com/workspaces) account.
@@ -246,6 +247,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.5.3 | 2024-09-27 | [46095](https://github.com/airbytehq/airbyte/pull/46095) | Fixed duplicates for `Product Images`, `Metafield Product Images` and `Metafield Products` streams for Incremental syncs |
| 2.5.2 | 2024-09-17 | [45633](https://github.com/airbytehq/airbyte/pull/45633) | Adds `read_inventory` as a required scope for `product_variants` stream |
| 2.5.1 | 2024-09-14 | [45255](https://github.com/airbytehq/airbyte/pull/45255) | Update dependencies |
| 2.5.0 | 2024-09-06 | [45190](https://github.com/airbytehq/airbyte/pull/45190) | Migrate to CDK v5 |