diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json deleted file mode 100644 index b323a3c63d0..00000000000 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "dataset": "test", - "provider": { - "storage": "S3", - "bucket": "test-bucket", - "aws_access_key_id": "123456", - "aws_secret_access_key": "123456key", - "path_prefix": "", - "endpoint": "http://10.0.34.79:9000" - }, - "format": { - "filetype": "csv", - "delimiter": ",", - "quote_char": "'", - "encoding": "utf8" - }, - "path_pattern": "*.csv", - "schema": "{}" -} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json deleted file mode 100644 index 574f5fb000e..00000000000 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "dataset": "test", - "provider": { - "storage": "S3", - "bucket": "test-bucket", - "aws_access_key_id": "123456", - "aws_secret_access_key": "123456key", - "path_prefix": "", - "endpoint": "http://:9000" - }, - "format": { - "filetype": "csv", - "delimiter": ",", - "quote_char": "'", - "encoding": "utf8" - }, - "path_pattern": "*.csv", - "schema": "{}" -} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/conftest.py b/airbyte-integrations/connectors/source-s3/integration_tests/conftest.py index 473792c58ad..67636cb0690 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/conftest.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/conftest.py @@ -2,101 +2,15 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import json -import time -from pathlib import Path -from typing import Any, Iterable, List, Mapping -from zipfile import ZipFile +from typing import Any -import docker -import pytest -import requests # type: ignore[import] from airbyte_cdk import AirbyteLogger -from docker.errors import APIError -from netifaces import AF_INET, ifaddresses, interfaces -from requests.exceptions import ConnectionError # type: ignore[import] -from .integration_test import TMP_FOLDER, TestIncrementalFileStreamS3 +from .integration_test import TestIncrementalFileStreamS3 LOGGER = AirbyteLogger() -def get_local_ip() -> str: - all_interface_ips: List[str] = [] - for iface_name in interfaces(): - all_interface_ips += [i["addr"] for i in ifaddresses(iface_name).setdefault(AF_INET, [{"addr": None}]) if i["addr"]] - LOGGER.info(f"detected interface IPs: {all_interface_ips}") - for ip in sorted(all_interface_ips): - if not ip.startswith("127."): - return ip - - assert False, "not found an non-localhost interface" - - -@pytest.fixture(scope="session") -def minio_credentials() -> Mapping[str, Any]: - config_template = Path(__file__).parent / "config_minio.template.json" - assert config_template.is_file() is not None, f"not found {config_template}" - config_file = Path(__file__).parent / "config_minio.json" - config_file.write_text(config_template.read_text().replace("", get_local_ip())) - credentials = {} - with open(str(config_file)) as f: - credentials = json.load(f) - return credentials - - -@pytest.fixture(scope="session", autouse=True) -def minio_setup(minio_credentials: Mapping[str, Any]) -> Iterable[None]: - - with ZipFile("./integration_tests/minio_data.zip") as archive: - archive.extractall(TMP_FOLDER) - client = docker.from_env() - # Minio should be attached to non-localhost interface. - # Because another test container should have direct connection to it - local_ip = get_local_ip() - LOGGER.debug(f"minio settings: {minio_credentials}") - try: - container = client.containers.run( - image="minio/minio:RELEASE.2021-10-06T23-36-31Z", - command=f"server {TMP_FOLDER}", - name="ci_test_minio", - auto_remove=True, - volumes=[f"/{TMP_FOLDER}/minio_data:/{TMP_FOLDER}"], - detach=True, - ports={"9000/tcp": (local_ip, 9000)}, - ) - except APIError as err: - if err.status_code == 409: - for container in client.containers.list(): - if container.name == "ci_test_minio": - LOGGER.info("minio was started before") - break - else: - raise - - check_url = f"http://{local_ip}:9000/minio/health/live" - checked = False - for _ in range(120): # wait 1 min - time.sleep(0.5) - LOGGER.info(f"try to connect to {check_url}") - try: - data = requests.get(check_url) - except ConnectionError as err: - LOGGER.warn(f"minio error: {err}") - continue - if data.status_code == 200: - checked = True - LOGGER.info("Run a minio/minio container...") - break - else: - LOGGER.info(f"minio error: {data.response.text}") - if not checked: - assert False, "couldn't connect to minio!!!" - - yield - # this minio container was not finished because it is needed for all integration adn acceptance tests - - def pytest_sessionfinish(session: Any, exitstatus: Any) -> None: """tries to find and remove all temp buckets""" instance = TestIncrementalFileStreamS3() diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py index 0ed13e0bf11..28a71f1520d 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py @@ -4,27 +4,16 @@ import json -import logging -import os -import shutil import time -from typing import Any, Dict, Iterator, List, Mapping +from typing import Iterator, List, Mapping import boto3 -import pytest from airbyte_cdk import AirbyteLogger from botocore.errorfactory import ClientError -from source_s3.source import SourceS3 from source_s3.stream import IncrementalFileStreamS3 -from unit_tests.abstract_test_parser import memory_limit -from unit_tests.test_csv_parser import generate_big_file from .integration_test_abstract import HERE, SAMPLE_DIR, AbstractTestIncrementalFileStream -TMP_FOLDER = "/tmp/test_minio_source_s3" -if not os.path.exists(TMP_FOLDER): - os.makedirs(TMP_FOLDER) - LOGGER = AirbyteLogger() @@ -73,9 +62,9 @@ class TestIncrementalFileStreamS3(AbstractTestIncrementalFileStream): if private: self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location) else: - self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location, ObjectOwnership='ObjectWriter') + self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location, ObjectOwnership="ObjectWriter") self.s3_client.delete_public_access_block(Bucket=bucket_name) - self.s3_client.put_bucket_acl(Bucket=bucket_name, ACL='public-read') + self.s3_client.put_bucket_acl(Bucket=bucket_name, ACL="public-read") # wait here until the bucket is ready ready = False @@ -107,36 +96,3 @@ class TestIncrementalFileStreamS3(AbstractTestIncrementalFileStream): bucket.objects.all().delete() bucket.delete() LOGGER.info(f"S3 Bucket {cloud_bucket_name} is now deleted") - - -class TestIntegrationCsvFiles: - logger = logging.getLogger("airbyte") - - @memory_limit(150) # max used memory should be less than 150Mb - def read_source(self, credentials: Dict[str, Any], catalog: Dict[str, Any]) -> int: - read_count = 0 - for msg in SourceS3().read(logger=self.logger, config=credentials, catalog=catalog): - if msg.record: - read_count += 1 - return read_count - - @pytest.mark.order(1) - def test_big_file(self, minio_credentials: Dict[str, Any]) -> None: - """tests a big csv file (>= 1.0G records)""" - # generates a big CSV files separately - big_file_folder = os.path.join(TMP_FOLDER, "minio_data", "test-bucket", "big_files") - shutil.rmtree(big_file_folder, ignore_errors=True) - os.makedirs(big_file_folder) - filepath = os.path.join(big_file_folder, "file.csv") - - # please change this value if you need to test another file size - future_file_size = 0.5 # in gigabytes - _, file_size = generate_big_file(filepath, future_file_size, 500) - expected_count = sum(1 for _ in open(filepath)) - 1 - self.logger.info(f"generated file {filepath} with size {file_size}Gb, lines: {expected_count}") - - minio_credentials["path_pattern"] = "big_files/file.csv" - minio_credentials["format"]["block_size"] = 5 * 1024**2 - source = SourceS3() - catalog = source.read_catalog(HERE / "configured_catalogs/csv.json") - assert self.read_source(minio_credentials, catalog) == expected_count