31 lines
1006 B
Python
31 lines
1006 B
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from dataclasses import InitVar, dataclass
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Mapping, Optional, Union
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers import BackoffStrategy
|
|
from airbyte_cdk.sources.declarative.types import Config
|
|
|
|
|
|
@dataclass
|
|
class WaitUntilMidnightBackoffStrategy(BackoffStrategy):
|
|
"""
|
|
Backoff strategy that waits until next midnight
|
|
"""
|
|
|
|
parameters: InitVar[Mapping[str, Any]]
|
|
config: Config
|
|
|
|
def backoff_time(
|
|
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int
|
|
) -> Optional[float]:
|
|
now_utc = datetime.now(datetime.timezone.utc)
|
|
midnight_utc = (now_utc + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
delta = midnight_utc - now_utc
|
|
return delta.total_seconds() if type(delta) is timedelta else delta.seconds
|