118 lines
5.1 KiB
Python
118 lines
5.1 KiB
Python
#
|
|
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Iterable, Optional
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
|
|
from airbyte_cdk.sources.declarative.transformations.transformation import RecordTransformation
|
|
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
|
|
from airbyte_cdk.sources.streams.core import Stream
|
|
|
|
|
|
@dataclass
|
|
class SubscriptionUsageTransformation(RecordTransformation):
|
|
subscription_id: str
|
|
|
|
def transform(
|
|
self,
|
|
record: Record,
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> Record:
|
|
# for each top level response record, there can be multiple sub-records depending
|
|
# on granularity and other input params. This function yields one transformed record
|
|
# for each subrecord in the response.
|
|
|
|
subrecords = record.get("usage", [])
|
|
del record["usage"]
|
|
for subrecord in subrecords:
|
|
# skip records that don't contain any actual usage
|
|
if subrecord.get("quantity", 0) > 0:
|
|
# Merge the parent record with the sub record
|
|
output = record.update(subrecord)
|
|
|
|
# Add the subscription ID to the output
|
|
output["subscription_id"] = self.subscription_id
|
|
|
|
# Un-nest billable_metric -> name,id into billable_metric_name and billable_metric_id
|
|
nested_billable_metric_name = output["billable_metric"]["name"]
|
|
nested_billable_metric_id = output["billable_metric"]["id"]
|
|
del output["billable_metric"]
|
|
output["billable_metric_name"] = nested_billable_metric_name
|
|
output["billable_metric_id"] = nested_billable_metric_id
|
|
|
|
# If a group_by key is specified, un-nest it
|
|
if config.subscription_usage_grouping_key:
|
|
nested_key = output["metric_group"]["property_key"]
|
|
nested_value = output["metric_group"]["property_value"]
|
|
del output["metric_group"]
|
|
output[nested_key] = nested_value
|
|
yield output
|
|
yield from []
|
|
|
|
|
|
@dataclass
|
|
class SubscriptionUsagePartitionRouter(StreamSlicer):
|
|
plans_stream: Stream
|
|
subscriptions_stream: Stream
|
|
config: Config
|
|
|
|
def stream_slices(self) -> Iterable[StreamSlice]:
|
|
"""
|
|
This stream is sliced per `subscription_id` and day, as well as `billable_metric_id`
|
|
if a grouping key is provided. This is because the API only supports a
|
|
single billable_metric_id per API call when using a group_by param.
|
|
|
|
"""
|
|
slice_yielded = False
|
|
subscriptions_stream = self.subscriptions_stream
|
|
plans_stream = self.plans_stream
|
|
|
|
# if using a group_by key, populate prices_by_plan_id so that each
|
|
# billable metric will get its own slice
|
|
if self.config.get("subscription_usage_grouping_key"):
|
|
metric_ids_by_plan_id = {}
|
|
|
|
for plan in plans_stream.read_records(sync_mode=SyncMode.full_refresh):
|
|
# if a plan_id filter is specified, skip any plan that doesn't match
|
|
if self.config.get("plan_id") and plan["id"] != self.config.get("plan_id"):
|
|
continue
|
|
|
|
prices = plan.get("prices", [])
|
|
metric_ids_by_plan_id[plan["id"]] = [(price.get("billable_metric") or {}).get("id") for price in prices]
|
|
|
|
for subscription in subscriptions_stream.read_records(sync_mode=SyncMode.full_refresh):
|
|
subscription_id = subscription["id"]
|
|
subscription_plan_id = subscription["plan_id"]
|
|
|
|
# if filtering subscription usage by plan ID, skip any subscription that doesn't match the plan_id
|
|
if self.config.get("plan_id") and subscription_plan_id != self.config.get("plan_id"):
|
|
continue
|
|
|
|
slice = {
|
|
"subscription_id": subscription_id,
|
|
}
|
|
|
|
# if using a group_by key, yield one slice per billable_metric_id.
|
|
# otherwise, yield slices without a billable_metric_id because
|
|
# each API call will return usage broken down by billable metric
|
|
# when grouping isn't used.
|
|
if self.config.get("subscription_usage_grouping_key"):
|
|
metric_ids = metric_ids_by_plan_id.get(subscription_plan_id)
|
|
if metric_ids is not None:
|
|
for metric_id in metric_ids:
|
|
# self.logger.warning("stream_slices is about to yield the following slice: %s", slice)
|
|
yield {**slice, "billable_metric_id": metric_id}
|
|
slice_yielded = True
|
|
else:
|
|
# self.logger.warning("stream_slices is about to yield the following slice: %s", slice)
|
|
yield slice
|
|
slice_yielded = True
|
|
if not slice_yielded:
|
|
# yield an empty slice to checkpoint state later
|
|
yield {}
|