1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-aws-cloudtrail/components.py
2024-12-18 14:05:43 -08:00

115 lines
5.0 KiB
Python

#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import datetime
import hashlib
import hmac
import json
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union
import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
@dataclass
class CustomAuthenticator(NoAuth):
config: Config
aws_key_id: Union[InterpolatedString, str]
aws_secret_key: Union[InterpolatedString, str]
aws_region_name: Union[InterpolatedString, str]
def __post_init__(self, parameters: Mapping[str, Any]):
self._aws_key_id = InterpolatedString.create(self.aws_key_id, parameters=parameters).eval(self.config)
self._aws_secret_key = InterpolatedString.create(self.aws_secret_key, parameters=parameters).eval(self.config)
self._aws_region_name = InterpolatedString.create(self.aws_region_name, parameters=parameters).eval(self.config)
self.path = "/"
self.service = "cloudtrail"
def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest:
"""Attach the HTTP headers required to authenticate on the HTTP request"""
self.headers = {
"Content-Type": "application/x-amz-json-1.1",
"Accept": "application/json",
"Host": f"cloudtrail.{self._aws_region_name}.amazonaws.com",
}
# StartTime and EndTime should be Unix timestamps with integer type
request_body_json = json.loads(request.body.decode("utf-8"))
if self.config.get("lookup_attributes_filter"):
lookup_attributes_filter = self.config.get("lookup_attributes_filter", {})
attribute_key = lookup_attributes_filter.get("attribute_key", "")
attribute_value = lookup_attributes_filter.get("attribute_value", "")
request_body_json["LookupAttributes"] = [{"AttributeKey": attribute_key, "AttributeValue": attribute_value}]
request_body_json["StartTime"] = int(float(request_body_json["StartTime"]))
request_body_json["EndTime"] = int(float(request_body_json["EndTime"]))
request.body = json.dumps(request_body_json).encode("utf-8")
# Sign the AWS Request and update headers with timestamp
authorization_header, amz_date = self.sign_aws_request(
self.service,
self._aws_region_name,
request.method,
self.path,
self.headers,
request_body_json,
self._aws_key_id,
self._aws_secret_key,
)
self.headers["X-Amz-Date"] = amz_date
self.headers["Authorization"] = authorization_header
request.headers.update(self.headers)
return request
@property
def auth_header(self) -> str:
return None
@property
def token(self):
return None
# AWS Cloudtrail Requires custom sign process with request, Referred from botocore client - https://github.com/boto/botocore/issues/786
def sign_aws_request(self, service, region, method, path, headers, payload, aws_access_key, aws_secret_key):
# Define required parameters for signing
service = service
region = region
method = method
path = path
headers = headers
string_payload = json.dumps(payload)
payload_hash = hashlib.sha256(string_payload.encode()).hexdigest()
# Generate timestamp and date for the request
amz_date = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
date_stamp = datetime.datetime.utcnow().strftime("%Y%m%d")
# Generate canonical request
canonical_headers = "".join([f"{key.lower()}:{value.strip()}\n" for key, value in sorted(headers.items())])
signed_headers = ";".join(sorted(key.lower() for key in headers))
canonical_request = f"{method}\n{path}\n\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
# Generate string to sign
algorithm = "AWS4-HMAC-SHA256"
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
string_to_sign = f"{algorithm}\n{amz_date}\n{credential_scope}\n" + hashlib.sha256(canonical_request.encode()).hexdigest()
# Generate signing key
secret = ("AWS4" + aws_secret_key).encode()
k_date = hmac.new(secret, date_stamp.encode(), hashlib.sha256).digest()
k_region = hmac.new(k_date, region.encode(), hashlib.sha256).digest()
k_service = hmac.new(k_region, service.encode(), hashlib.sha256).digest()
k_signing = hmac.new(k_service, b"aws4_request", hashlib.sha256).digest()
# Generate signature
signature = hmac.new(k_signing, string_to_sign.encode(), hashlib.sha256).hexdigest()
# Generate authorization header
authorization_header = (
f"{algorithm} Credential={aws_access_key}/{credential_scope}, " f"SignedHeaders={signed_headers}, Signature={signature}"
)
return authorization_header, amz_date