Source Orb: Migrate to manifest-only format with components (#47288)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: ChristoGrab <christo.grab@gmail.com>
This commit is contained in:
119
airbyte-integrations/connectors/source-orb/components.py
Normal file
119
airbyte-integrations/connectors/source-orb/components.py
Normal file
@@ -0,0 +1,119 @@
|
||||
#
|
||||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
|
||||
from airbyte_cdk.sources.declarative.transformations.transformation import RecordTransformation
|
||||
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
|
||||
from airbyte_cdk.sources.streams.core import Stream
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubscriptionUsageTransformation(RecordTransformation):
|
||||
subscription_id: str
|
||||
|
||||
def transform(
|
||||
self,
|
||||
record: Record,
|
||||
config: Optional[Config] = None,
|
||||
stream_state: Optional[StreamState] = None,
|
||||
stream_slice: Optional[StreamSlice] = None,
|
||||
) -> Record:
|
||||
|
||||
# for each top level response record, there can be multiple sub-records depending
|
||||
# on granularity and other input params. This function yields one transformed record
|
||||
# for each subrecord in the response.
|
||||
|
||||
subrecords = record.get("usage", [])
|
||||
del record["usage"]
|
||||
for subrecord in subrecords:
|
||||
# skip records that don't contain any actual usage
|
||||
if subrecord.get("quantity", 0) > 0:
|
||||
# Merge the parent record with the sub record
|
||||
output = record.update(subrecord)
|
||||
|
||||
# Add the subscription ID to the output
|
||||
output["subscription_id"] = self.subscription_id
|
||||
|
||||
# Un-nest billable_metric -> name,id into billable_metric_name and billable_metric_id
|
||||
nested_billable_metric_name = output["billable_metric"]["name"]
|
||||
nested_billable_metric_id = output["billable_metric"]["id"]
|
||||
del output["billable_metric"]
|
||||
output["billable_metric_name"] = nested_billable_metric_name
|
||||
output["billable_metric_id"] = nested_billable_metric_id
|
||||
|
||||
# If a group_by key is specified, un-nest it
|
||||
if config.subscription_usage_grouping_key:
|
||||
nested_key = output["metric_group"]["property_key"]
|
||||
nested_value = output["metric_group"]["property_value"]
|
||||
del output["metric_group"]
|
||||
output[nested_key] = nested_value
|
||||
yield output
|
||||
yield from []
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubscriptionUsagePartitionRouter(StreamSlicer):
|
||||
plans_stream: Stream
|
||||
subscriptions_stream: Stream
|
||||
config: Config
|
||||
|
||||
def stream_slices(self) -> Iterable[StreamSlice]:
|
||||
|
||||
"""
|
||||
This stream is sliced per `subscription_id` and day, as well as `billable_metric_id`
|
||||
if a grouping key is provided. This is because the API only supports a
|
||||
single billable_metric_id per API call when using a group_by param.
|
||||
|
||||
"""
|
||||
slice_yielded = False
|
||||
subscriptions_stream = self.subscriptions_stream
|
||||
plans_stream = self.plans_stream
|
||||
|
||||
# if using a group_by key, populate prices_by_plan_id so that each
|
||||
# billable metric will get its own slice
|
||||
if self.config.get("subscription_usage_grouping_key"):
|
||||
metric_ids_by_plan_id = {}
|
||||
|
||||
for plan in plans_stream.read_records(sync_mode=SyncMode.full_refresh):
|
||||
# if a plan_id filter is specified, skip any plan that doesn't match
|
||||
if self.config.get("plan_id") and plan["id"] != self.config.get("plan_id"):
|
||||
continue
|
||||
|
||||
prices = plan.get("prices", [])
|
||||
metric_ids_by_plan_id[plan["id"]] = [(price.get("billable_metric") or {}).get("id") for price in prices]
|
||||
|
||||
for subscription in subscriptions_stream.read_records(sync_mode=SyncMode.full_refresh):
|
||||
subscription_id = subscription["id"]
|
||||
subscription_plan_id = subscription["plan_id"]
|
||||
|
||||
# if filtering subscription usage by plan ID, skip any subscription that doesn't match the plan_id
|
||||
if self.config.get("plan_id") and subscription_plan_id != self.config.get("plan_id"):
|
||||
continue
|
||||
|
||||
slice = {
|
||||
"subscription_id": subscription_id,
|
||||
}
|
||||
|
||||
# if using a group_by key, yield one slice per billable_metric_id.
|
||||
# otherwise, yield slices without a billable_metric_id because
|
||||
# each API call will return usage broken down by billable metric
|
||||
# when grouping isn't used.
|
||||
if self.config.get("subscription_usage_grouping_key"):
|
||||
metric_ids = metric_ids_by_plan_id.get(subscription_plan_id)
|
||||
if metric_ids is not None:
|
||||
for metric_id in metric_ids:
|
||||
# self.logger.warning("stream_slices is about to yield the following slice: %s", slice)
|
||||
yield {**slice, "billable_metric_id": metric_id}
|
||||
slice_yielded = True
|
||||
else:
|
||||
# self.logger.warning("stream_slices is about to yield the following slice: %s", slice)
|
||||
yield slice
|
||||
slice_yielded = True
|
||||
if not slice_yielded:
|
||||
# yield an empty slice to checkpoint state later
|
||||
yield {}
|
||||
Reference in New Issue
Block a user