1
0
mirror of synced 2025-12-20 18:39:31 -05:00
Files
airbyte/airbyte-integrations/connectors/source-gcs/integration_tests/utils.py
Artem Inzhyyants f3c748b8e7 test(source-gcs): add integration test (#45850)
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
2024-09-24 20:27:19 +02:00

25 lines
817 B
Python

# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
import json
import os
import re
from typing import Any, Mapping, Union
def get_docker_ip() -> Union[str, Any]:
# When talking to the Docker daemon via a UNIX socket, route all TCP
# traffic to docker containers via the TCP loopback interface.
docker_host = os.environ.get("DOCKER_HOST", "").strip()
if not docker_host or docker_host.startswith("unix://"):
return "127.0.0.1"
match = re.match(r"^tcp://(.+?):\d+$", docker_host)
if not match:
raise ValueError('Invalid value for DOCKER_HOST: "%s".' % (docker_host,))
return match.group(1)
def load_config(config_path: str) -> Mapping[str, Any]:
with open(f"{os.path.dirname(__file__)}/configs/{config_path}", "r") as config:
return json.load(config)