1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-orb/components.py
2024-12-18 14:05:43 -08:00

118 lines
5.1 KiB
Python

#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Iterable, Optional
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.transformations.transformation import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import Stream
@dataclass
class SubscriptionUsageTransformation(RecordTransformation):
subscription_id: str
def transform(
self,
record: Record,
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Record:
# for each top level response record, there can be multiple sub-records depending
# on granularity and other input params. This function yields one transformed record
# for each subrecord in the response.
subrecords = record.get("usage", [])
del record["usage"]
for subrecord in subrecords:
# skip records that don't contain any actual usage
if subrecord.get("quantity", 0) > 0:
# Merge the parent record with the sub record
output = record.update(subrecord)
# Add the subscription ID to the output
output["subscription_id"] = self.subscription_id
# Un-nest billable_metric -> name,id into billable_metric_name and billable_metric_id
nested_billable_metric_name = output["billable_metric"]["name"]
nested_billable_metric_id = output["billable_metric"]["id"]
del output["billable_metric"]
output["billable_metric_name"] = nested_billable_metric_name
output["billable_metric_id"] = nested_billable_metric_id
# If a group_by key is specified, un-nest it
if config.subscription_usage_grouping_key:
nested_key = output["metric_group"]["property_key"]
nested_value = output["metric_group"]["property_value"]
del output["metric_group"]
output[nested_key] = nested_value
yield output
yield from []
@dataclass
class SubscriptionUsagePartitionRouter(StreamSlicer):
plans_stream: Stream
subscriptions_stream: Stream
config: Config
def stream_slices(self) -> Iterable[StreamSlice]:
"""
This stream is sliced per `subscription_id` and day, as well as `billable_metric_id`
if a grouping key is provided. This is because the API only supports a
single billable_metric_id per API call when using a group_by param.
"""
slice_yielded = False
subscriptions_stream = self.subscriptions_stream
plans_stream = self.plans_stream
# if using a group_by key, populate prices_by_plan_id so that each
# billable metric will get its own slice
if self.config.get("subscription_usage_grouping_key"):
metric_ids_by_plan_id = {}
for plan in plans_stream.read_records(sync_mode=SyncMode.full_refresh):
# if a plan_id filter is specified, skip any plan that doesn't match
if self.config.get("plan_id") and plan["id"] != self.config.get("plan_id"):
continue
prices = plan.get("prices", [])
metric_ids_by_plan_id[plan["id"]] = [(price.get("billable_metric") or {}).get("id") for price in prices]
for subscription in subscriptions_stream.read_records(sync_mode=SyncMode.full_refresh):
subscription_id = subscription["id"]
subscription_plan_id = subscription["plan_id"]
# if filtering subscription usage by plan ID, skip any subscription that doesn't match the plan_id
if self.config.get("plan_id") and subscription_plan_id != self.config.get("plan_id"):
continue
slice = {
"subscription_id": subscription_id,
}
# if using a group_by key, yield one slice per billable_metric_id.
# otherwise, yield slices without a billable_metric_id because
# each API call will return usage broken down by billable metric
# when grouping isn't used.
if self.config.get("subscription_usage_grouping_key"):
metric_ids = metric_ids_by_plan_id.get(subscription_plan_id)
if metric_ids is not None:
for metric_id in metric_ids:
# self.logger.warning("stream_slices is about to yield the following slice: %s", slice)
yield {**slice, "billable_metric_id": metric_id}
slice_yielded = True
else:
# self.logger.warning("stream_slices is about to yield the following slice: %s", slice)
yield slice
slice_yielded = True
if not slice_yielded:
# yield an empty slice to checkpoint state later
yield {}