1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-faker/source_faker/utils.py
Cole Snodgrass 2e099acc52 update headers from 2022 -> 2023 (#22594)
* It's 2023!

* 2022 -> 2023

---------

Co-authored-by: evantahler <evan@airbyte.io>
2023-02-08 13:01:16 -08:00

34 lines
937 B
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import datetime
import json
from airbyte_cdk.models import AirbyteEstimateTraceMessage, AirbyteTraceMessage, EstimateType, TraceType
def read_json(filepath):
with open(filepath, "r") as f:
return json.loads(f.read())
def format_airbyte_time(d: datetime):
s = f"{d}"
s = s.split(".")[0]
s = s.replace(" ", "T")
s += "+00:00"
return s
def now_millis():
return int(datetime.datetime.now().timestamp() * 1000)
def generate_estimate(stream_name: str, total: int, bytes_per_row: int):
emitted_at = int(datetime.datetime.now().timestamp() * 1000)
estimate_message = AirbyteEstimateTraceMessage(
type=EstimateType.STREAM, name=stream_name, row_estimate=round(total), byte_estimate=round(total * bytes_per_row)
)
return AirbyteTraceMessage(type=TraceType.ESTIMATE, emitted_at=emitted_at, estimate=estimate_message)