115 lines
5.0 KiB
Python
115 lines
5.0 KiB
Python
#
|
|
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import datetime
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
from dataclasses import InitVar, dataclass
|
|
from typing import Any, Mapping, Union
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth
|
|
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
|
|
from airbyte_cdk.sources.declarative.types import Config
|
|
|
|
|
|
@dataclass
|
|
class CustomAuthenticator(NoAuth):
|
|
config: Config
|
|
aws_key_id: Union[InterpolatedString, str]
|
|
aws_secret_key: Union[InterpolatedString, str]
|
|
aws_region_name: Union[InterpolatedString, str]
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]):
|
|
self._aws_key_id = InterpolatedString.create(self.aws_key_id, parameters=parameters).eval(self.config)
|
|
self._aws_secret_key = InterpolatedString.create(self.aws_secret_key, parameters=parameters).eval(self.config)
|
|
self._aws_region_name = InterpolatedString.create(self.aws_region_name, parameters=parameters).eval(self.config)
|
|
self.path = "/"
|
|
self.service = "cloudtrail"
|
|
|
|
def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest:
|
|
"""Attach the HTTP headers required to authenticate on the HTTP request"""
|
|
self.headers = {
|
|
"Content-Type": "application/x-amz-json-1.1",
|
|
"Accept": "application/json",
|
|
"Host": f"cloudtrail.{self._aws_region_name}.amazonaws.com",
|
|
}
|
|
# StartTime and EndTime should be Unix timestamps with integer type
|
|
request_body_json = json.loads(request.body.decode("utf-8"))
|
|
if self.config.get("lookup_attributes_filter"):
|
|
lookup_attributes_filter = self.config.get("lookup_attributes_filter", {})
|
|
attribute_key = lookup_attributes_filter.get("attribute_key", "")
|
|
attribute_value = lookup_attributes_filter.get("attribute_value", "")
|
|
|
|
request_body_json["LookupAttributes"] = [{"AttributeKey": attribute_key, "AttributeValue": attribute_value}]
|
|
request_body_json["StartTime"] = int(float(request_body_json["StartTime"]))
|
|
request_body_json["EndTime"] = int(float(request_body_json["EndTime"]))
|
|
request.body = json.dumps(request_body_json).encode("utf-8")
|
|
# Sign the AWS Request and update headers with timestamp
|
|
authorization_header, amz_date = self.sign_aws_request(
|
|
self.service,
|
|
self._aws_region_name,
|
|
request.method,
|
|
self.path,
|
|
self.headers,
|
|
request_body_json,
|
|
self._aws_key_id,
|
|
self._aws_secret_key,
|
|
)
|
|
self.headers["X-Amz-Date"] = amz_date
|
|
self.headers["Authorization"] = authorization_header
|
|
request.headers.update(self.headers)
|
|
return request
|
|
|
|
@property
|
|
def auth_header(self) -> str:
|
|
return None
|
|
|
|
@property
|
|
def token(self):
|
|
return None
|
|
|
|
# AWS Cloudtrail Requires custom sign process with request, Referred from botocore client - https://github.com/boto/botocore/issues/786
|
|
def sign_aws_request(self, service, region, method, path, headers, payload, aws_access_key, aws_secret_key):
|
|
# Define required parameters for signing
|
|
service = service
|
|
region = region
|
|
method = method
|
|
path = path
|
|
headers = headers
|
|
string_payload = json.dumps(payload)
|
|
payload_hash = hashlib.sha256(string_payload.encode()).hexdigest()
|
|
|
|
# Generate timestamp and date for the request
|
|
amz_date = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
|
|
date_stamp = datetime.datetime.utcnow().strftime("%Y%m%d")
|
|
|
|
# Generate canonical request
|
|
canonical_headers = "".join([f"{key.lower()}:{value.strip()}\n" for key, value in sorted(headers.items())])
|
|
signed_headers = ";".join(sorted(key.lower() for key in headers))
|
|
canonical_request = f"{method}\n{path}\n\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
|
|
|
|
# Generate string to sign
|
|
algorithm = "AWS4-HMAC-SHA256"
|
|
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
|
|
string_to_sign = f"{algorithm}\n{amz_date}\n{credential_scope}\n" + hashlib.sha256(canonical_request.encode()).hexdigest()
|
|
|
|
# Generate signing key
|
|
secret = ("AWS4" + aws_secret_key).encode()
|
|
k_date = hmac.new(secret, date_stamp.encode(), hashlib.sha256).digest()
|
|
k_region = hmac.new(k_date, region.encode(), hashlib.sha256).digest()
|
|
k_service = hmac.new(k_region, service.encode(), hashlib.sha256).digest()
|
|
k_signing = hmac.new(k_service, b"aws4_request", hashlib.sha256).digest()
|
|
|
|
# Generate signature
|
|
signature = hmac.new(k_signing, string_to_sign.encode(), hashlib.sha256).hexdigest()
|
|
|
|
# Generate authorization header
|
|
authorization_header = (
|
|
f"{algorithm} Credential={aws_access_key}/{credential_scope}, " f"SignedHeaders={signed_headers}, Signature={signature}"
|
|
)
|
|
return authorization_header, amz_date
|