1
0
mirror of synced 2025-12-25 02:09:19 -05:00

feat(source-linkedin-ads): Upgrade to latest CDK and use JsonSchemaPropertySelector to reduce API requests for unselected columns (#68614)

This commit is contained in:
Brian Lai
2025-10-30 15:40:24 -07:00
committed by GitHub
parent ca7f69f427
commit deb4440a23
14 changed files with 1103 additions and 333 deletions

View File

@@ -1606,6 +1606,8 @@ definitions:
record_merge_strategy:
type: GroupByKeyMergeStrategy
key: ["end_date", "string_of_pivot_values"]
property_selector:
type: JsonSchemaPropertySelector
streams:
- $ref: "#/definitions/streams/accounts"

View File

@@ -7,11 +7,11 @@ data:
- linkedin.com
- api.linkedin.com
connectorBuildOptions:
baseImage: docker.io/airbyte/source-declarative-manifest:7.3.9@sha256:467a90d7721f4e0ca0065e6b2d257abfb371fdeafbc64c8ce63c039270aba0f0
baseImage: docker.io/airbyte/source-declarative-manifest:7.4.0@sha256:1f8681a991f239c3f1a95826b9a78ff1326e03c03fdcb30bbfb2060ffa9c8e31
connectorSubtype: api
connectorType: source
definitionId: 137ece28-5434-455c-8f34-69dc3782f451
dockerImageTag: 5.5.5
dockerImageTag: 5.6.0-rc.1
dockerRepository: airbyte/source-linkedin-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/linkedin-ads
githubIssueLabel: source-linkedin-ads
@@ -31,7 +31,7 @@ data:
releaseStage: generally_available
releases:
rolloutConfiguration:
enableProgressiveRollout: false
enableProgressiveRollout: true
breakingChanges:
1.0.0:
message: This upgrade brings changes in primary key to *-analytics streams.

View File

@@ -0,0 +1 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

View File

@@ -6,7 +6,9 @@ import json
import os
import sys
from pathlib import Path
from typing import Any, Mapping
from typing import Any, Mapping, Optional
from airbyte_protocol_dataclasses.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.test.catalog_builder import CatalogBuilder
@@ -31,8 +33,8 @@ _YAML_FILE_PATH = _SOURCE_FOLDER_PATH / "manifest.yaml"
sys.path.append(str(_SOURCE_FOLDER_PATH)) # to allow loading custom components
def get_source(config) -> YamlDeclarativeSource:
catalog = CatalogBuilder().build()
def get_source(config, catalog: Optional[ConfiguredAirbyteCatalog] = None) -> YamlDeclarativeSource:
catalog = catalog or CatalogBuilder().build()
state = StateBuilder().build()
return YamlDeclarativeSource(path_to_yaml=str(_YAML_FILE_PATH), catalog=catalog, config=config, state=state)
@@ -42,7 +44,7 @@ def find_stream(stream_name, config):
# cache should be disabled once this issue is fixed https://github.com/airbytehq/airbyte-internal-issues/issues/6513
for stream in streams:
stream.retriever.requester.use_cache = True
stream._stream_partition_generator._partition_factory._retriever.requester.use_cache = True
# find by name
for stream in streams:

File diff suppressed because it is too large Load Diff

View File

@@ -8,7 +8,7 @@ description = "Unit tests for source-linkedin-ads"
authors = ["Airbyte <contact@airbyte.io>"]
[tool.poetry.dependencies]
python = "^3.10,<3.13"
airbyte-cdk = "^6"
airbyte-cdk = "^7"
pytest = "^8"
freezegun = "^1.4.0"
pytest-mock = "^3.6.1"

View File

@@ -0,0 +1,45 @@
{
"paging": {
"start": 0,
"count": 10,
"links": []
},
"elements": [
{
"clicks": 100,
"impressions": 19090,
"pivotValues": ["urn:li:sponsoredCampaign:123"],
"costInUsd": "209.449",
"dateRange": {
"start": {
"month": 1,
"day": 2,
"year": 2023
},
"end": {
"month": 1,
"day": 2,
"year": 2023
}
}
},
{
"clicks": 408,
"impressions": 20210,
"pivotValues": ["urn:li:sponsoredCampaign:123"],
"costInUsd": "509.980",
"dateRange": {
"start": {
"month": 1,
"day": 3,
"year": 2023
},
"end": {
"month": 1,
"day": 3,
"year": 2023
}
}
}
]
}

View File

@@ -0,0 +1,45 @@
{
"paging": {
"start": 0,
"count": 10,
"links": []
},
"elements": [
{
"clicks": 100,
"impressions": 19090,
"pivotValues": ["urn:li:sponsoredCampaign:123"],
"costInUsd": "209.449",
"dateRange": {
"start": {
"month": 1,
"day": 4,
"year": 2023
},
"end": {
"month": 1,
"day": 4,
"year": 2023
}
}
},
{
"clicks": 408,
"impressions": 20210,
"pivotValues": ["urn:li:sponsoredCampaign:123"],
"costInUsd": "509.980",
"dateRange": {
"start": {
"month": 1,
"day": 4,
"year": 2023
},
"end": {
"month": 1,
"day": 4,
"year": 2023
}
}
}
]
}

View File

@@ -0,0 +1,27 @@
{
"paging": {
"start": 0,
"count": 10,
"links": []
},
"elements": [
{
"clicks": 100,
"impressions": 19090,
"pivotValues": ["urn:li:sponsoredCampaign:123"],
"costInUsd": "209.449",
"dateRange": {
"start": {
"month": 1,
"day": 5,
"year": 2023
},
"end": {
"month": 1,
"day": 5,
"year": 2023
}
}
}
]
}

View File

@@ -1,19 +1,22 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
import logging
from typing import Any, Dict, List
import pytest
import requests
from conftest import find_stream, get_source, load_json_file
from airbyte_protocol_dataclasses.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode
from airbyte_protocol_dataclasses.models import Status as ConnectionStatus
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.sources.types import Record, StreamSlice
from unit_tests.utils import run_read
from .conftest import find_stream, get_source, load_json_file
logger = logging.getLogger("airbyte")
@@ -93,16 +96,16 @@ class TestAllStreams:
@pytest.mark.parametrize("error_code", [429, 500, 503])
def test_should_retry_on_error(self, error_code, requests_mock, mocker):
mocker.patch.object(
ManifestDeclarativeSource, "_initialize_cache_for_parent_streams", side_effect=self._mock_initialize_cache_for_parent_streams
ConcurrentDeclarativeSource, "_initialize_cache_for_parent_streams", side_effect=self._mock_initialize_cache_for_parent_streams
)
mocker.patch("time.sleep", lambda x: None)
stream = find_stream("accounts", TEST_CONFIG)
requests_mock.register_uri(
"GET", "https://api.linkedin.com/rest/adAccounts", [{"status_code": error_code, "json": {"elements": []}}]
)
stream.exit_on_rate_limit = True
with pytest.raises(DefaultBackoffException):
list(stream.read_records(sync_mode=SyncMode.full_refresh))
stream._stream_partition_generator._partition_factory._retriever.requester.exit_on_rate_limit = True
with pytest.raises(MessageRepresentationAirbyteTracedErrors):
list(run_read(stream))
def test_custom_streams(self, requests_mock):
config = {"ad_analytics_reports": [{"name": "ShareAdByMonth", "pivot_by": "COMPANY", "time_granularity": "MONTHLY"}], **TEST_CONFIG}
@@ -128,8 +131,9 @@ class TestAllStreams:
],
)
stream_slice = next(custom_stream.stream_slices(sync_mode=SyncMode.full_refresh))
records = list(custom_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, stream_state=None))
partitions = iter(custom_stream.generate_partitions())
partition_1 = next(partitions)
records = list(partition_1.read())
assert len(records) == 2
@@ -156,28 +160,28 @@ class TestAllStreams:
)
def test_path(self, stream_name, expected):
stream = find_stream(stream_name, config=TEST_CONFIG)
result = stream.retriever.requester.path
result = stream._stream_partition_generator._partition_factory._retriever.requester.path
assert result == expected
@pytest.mark.parametrize(
("status_code", "is_connection_successful", "error_msg"),
("status_code", "expected_connection_status", "error_msg"),
(
(
400,
False,
("Stream accounts is not available: Bad request. Please check your request parameters."),
ConnectionStatus.FAILED,
"Stream accounts is not available: HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
),
(
403,
False,
("Stream accounts is not available: Forbidden. You don't have permission to access this resource."),
ConnectionStatus.FAILED,
"Stream accounts is not available: HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
(200, True, None),
(200, ConnectionStatus.SUCCEEDED, None),
),
)
def test_check_connection(self, requests_mock, status_code, is_connection_successful, error_msg, mocker):
def test_check_connection(self, requests_mock, status_code, expected_connection_status, error_msg, mocker):
mocker.patch.object(
ManifestDeclarativeSource, "_initialize_cache_for_parent_streams", side_effect=self._mock_initialize_cache_for_parent_streams
ConcurrentDeclarativeSource, "_initialize_cache_for_parent_streams", side_effect=self._mock_initialize_cache_for_parent_streams
)
mocker.patch("time.sleep", lambda x: None)
json = {"elements": [{"data": []}] * 500} if 200 >= status_code < 300 else {}
@@ -187,9 +191,11 @@ class TestAllStreams:
status_code=status_code,
json=json,
)
success, error = get_source(config=TEST_CONFIG).check_connection(logger=logger, config=TEST_CONFIG)
assert success is is_connection_successful
assert error == error_msg
connection_status = get_source(config=TEST_CONFIG).check(logger=logger, config=TEST_CONFIG)
assert connection_status.status is expected_connection_status
if error_msg:
assert error_msg in connection_status.message
class TestLinkedinAdsStream:
@@ -199,7 +205,7 @@ class TestLinkedinAdsStream:
@pytest.fixture
def accounts_stream_url(self, accounts_stream) -> str:
return f"{accounts_stream.retriever.requester.url_base}/{accounts_stream.retriever.requester.path}"
return f"{accounts_stream._stream_partition_generator._partition_factory._retriever.requester.url_base}/{accounts_stream._stream_partition_generator._partition_factory._retriever.requester.path}"
@pytest.mark.parametrize(
"response_json, expected",
@@ -232,5 +238,102 @@ class TestLinkedinAdsStream:
last_record = response_json.get("elements", [])[-1] if response_json.get("elements") else None
last_page_token_value = None
result = accounts_stream.retriever._next_page_token(test_response, last_page_size, last_record, last_page_token_value)
result = accounts_stream._stream_partition_generator._partition_factory._retriever._next_page_token(
test_response, last_page_size, last_record, last_page_token_value
)
assert expected == result
def test_ad_campaign_analytics_stream(self, requests_mock):
# Test the built-in ad_campaign_analytics stream with mocked responses
config = {**TEST_CONFIG}
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="ad_campaign_analytics",
json_schema={
"type": "object",
"properties": {
"id": {"type": ["null", "string"]},
"clicks": {"type": ["null", "number"]},
"impressions": {"type": ["null", "number"]},
"sponsoredCampaign": {"type": ["null", "number"]},
},
},
supported_sync_modes=[SyncMode.full_refresh],
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)
]
)
expected_records = [
Record(
stream_name="ad_campaign_analytics",
data={
"clicks": 100.0,
"impressions": 19090.0,
"pivotValues": ["urn:li:sponsoredCampaign:123"],
"costInUsd": 209.449,
"start_date": "2023-01-02",
"end_date": "2023-01-02",
"string_of_pivot_values": "urn:li:sponsoredCampaign:123",
"sponsoredCampaign": "1111",
"pivot": "CAMPAIGN",
},
associated_slice=StreamSlice(
cursor_slice={"end_time": "2021-01-31", "start_time": "2021-01-01"},
partition={"campaign_id": 1111, "parent_slice": {"account_id": 1, "parent_slice": {}}},
extra_fields={"query_properties": ["dateRange", "pivotValues", "clicks", "impressions"]},
),
),
Record(
stream_name="ad_campaign_analytics",
data={
"clicks": 408.0,
"impressions": 20210.0,
"pivotValues": ["urn:li:sponsoredCampaign:123"],
"costInUsd": 509.98,
"start_date": "2023-01-03",
"end_date": "2023-01-03",
"string_of_pivot_values": "urn:li:sponsoredCampaign:123",
"sponsoredCampaign": "1111",
"pivot": "CAMPAIGN",
},
associated_slice=StreamSlice(
cursor_slice={"end_time": "2021-01-31", "start_time": "2021-01-01"},
partition={"campaign_id": 1111, "parent_slice": {"account_id": 1, "parent_slice": {}}},
extra_fields={"query_properties": ["dateRange", "pivotValues", "clicks", "impressions"]},
),
),
]
streams = get_source(config=config, catalog=catalog).streams(config=config)
ad_campaign_analytics_streams = [stream for stream in streams if stream.name == "ad_campaign_analytics"]
assert len(ad_campaign_analytics_streams) == 1
ad_campaign_analytics_stream = ad_campaign_analytics_streams[0]
requests_mock.get("https://api.linkedin.com/rest/adAccounts", json={"elements": [{"id": 1}]})
requests_mock.get(
"https://api.linkedin.com/rest/adAccounts/1/adCampaigns?q=search&search=(status:(values:List(ACTIVE,PAUSED,ARCHIVED,"
"COMPLETED,CANCELED,DRAFT,PENDING_DELETION,REMOVED)))",
json={"elements": [{"id": 1111, "lastModified": "2021-01-15"}]},
)
requests_mock.get(
"https://api.linkedin.com/rest/adAnalytics?q=analytics&campaigns=List(urn%3Ali%3AsponsoredCampaign%3A1111)&dateRange=(start:(year:2021,month:1,day:1),end:(year:2021,month:1,day:31))&fields=dateRange,pivotValues,clicks,impressions",
[
{"json": load_json_file("responses/ad_campaign_analytics/response_1.json")},
{"json": load_json_file("responses/ad_campaign_analytics/response_2.json")},
{"json": load_json_file("responses/ad_campaign_analytics/response_3.json")},
],
)
partitions = iter(ad_campaign_analytics_stream.generate_partitions())
partition_1 = next(partitions)
records = list(partition_1.read())
assert len(records) == 2
assert records == expected_records

View File

@@ -2,12 +2,12 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from conftest import find_stream, load_json_file
from freezegun import freeze_time
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from .conftest import find_stream, load_json_file
# Test input arguments for the `make_analytics_slices`
TEST_KEY_VALUE_MAP = {"camp_id": "id"}
@@ -31,7 +31,7 @@ TEST_CONFIG: dict = {
@freeze_time("2021-03-01")
def test_analytics_stream_slices(requests_mock):
expected_slices = [
expected_partitions = [
{
"campaign_id": 123,
"start_time": "2021-01-01",
@@ -49,7 +49,7 @@ def test_analytics_stream_slices(requests_mock):
stream = find_stream("ad_member_country_analytics", TEST_CONFIG)
requests_mock.get("https://api.linkedin.com/rest/adAccounts", json={"elements": [{"id": 1}]})
requests_mock.get("https://api.linkedin.com/rest/adAccounts/1/adCampaigns", json={"elements": [{"id": 123}]})
assert [dict(i) for i in list(stream.retriever.stream_slicer.stream_slices())] == expected_slices
assert [partition.to_slice() for partition in list(stream.generate_partitions())] == expected_partitions
def test_read_records(requests_mock):
@@ -68,6 +68,8 @@ def test_read_records(requests_mock):
],
)
stream_slice = next(stream.stream_slices(sync_mode=SyncMode.incremental))
records = list(stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice, stream_state=None))
partitions = iter(stream.generate_partitions())
partition_1 = next(partitions)
records = list(partition_1.read())
assert len(records) == 2

View File

@@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from samples.test_data_for_tranform import input_test_data, output_test_data
from .samples.test_data_for_tranform import input_test_data, output_test_data
def test_transform_data(components_module):

View File

@@ -0,0 +1,15 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
def run_read(stream_instance: DefaultStream):
res = []
partitions = stream_instance.generate_partitions()
for partition in partitions:
records = partition.read()
for record in records:
res.append(record)
stream_instance.cursor.observe(record)
stream_instance.cursor.close_partition(partition)
return res

View File

@@ -160,6 +160,8 @@ For Ad Analytics Streams such as `Ad Analytics by Campaign` and `Ad Analytics by
## Performance considerations
### Rate limits
LinkedIn Ads has Official Rate Limits for API Usage, [more information here](https://docs.microsoft.com/en-us/linkedin/shared/api-guide/concepts/rate-limits?context=linkedin/marketing/context). Rate limited requests will receive a 429 response. These limits reset at midnight UTC every day. In rare cases, LinkedIn may also return a 429 response as part of infrastructure protection. API service will return to normal automatically. In such cases, you will receive the following error message:
```text
@@ -174,6 +176,14 @@ This is expected when the connector hits the 429 - Rate Limit Exceeded HTTP Erro
After 5 unsuccessful attempts - the connector will stop the sync operation. In such cases check your Rate Limits [on this page](https://www.linkedin.com/developers/apps) &gt; Choose your app &gt; Analytics.
### Ad analytics streams
LinkedIn Ads supports a number of different streams that provide metrics about the effectiveness of ads for a specified date range across various properties and dimensions such as `clicks`, `follows`, `impressions`, `reactions`, `totalEngagements`, and others. Fetching the entire set of properties per ad object can lead to increased sync times.
In order to improve sync performance, when configuring your connection, only select the columns that you need replicated into your downstream destination. Fewer columns selected should reduce the duration of syncs.
:::caution The LinkedIn Ads API will not return records that do not have no values for any of the dimensions you specify. Please take caution selecting columns as you may see fewer or more records depending on your selection.:::
## Data type map
| Integration Type | Airbyte Type | Notes |
@@ -202,6 +212,7 @@ No workaround has been identified to manage this issue as of 2025, February.
| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 5.6.0-rc.1 | 2025-10-29 | [68614](https://github.com/airbytehq/airbyte/pull/68614) | Upgrade to latest CDK to only include the selected columns of the schema in API requests for ad analytics streams |
| 5.5.5 | 2025-10-27 | [68626](https://github.com/airbytehq/airbyte/pull/68626) | Increase concurrency and introduce initial attempt at API budget |
| 5.5.4 | 2025-10-21 | [64967](https://github.com/airbytehq/airbyte/pull/64967) | Update dependencies |
| 5.5.3 | 2025-10-09 | [67564](https://github.com/airbytehq/airbyte/pull/67564) | Upgrade to CDK v7. |