source-s3: delete integration tests using minio (#26908)
This commit is contained in:
@@ -1,19 +0,0 @@
|
||||
{
|
||||
"dataset": "test",
|
||||
"provider": {
|
||||
"storage": "S3",
|
||||
"bucket": "test-bucket",
|
||||
"aws_access_key_id": "123456",
|
||||
"aws_secret_access_key": "123456key",
|
||||
"path_prefix": "",
|
||||
"endpoint": "http://10.0.34.79:9000"
|
||||
},
|
||||
"format": {
|
||||
"filetype": "csv",
|
||||
"delimiter": ",",
|
||||
"quote_char": "'",
|
||||
"encoding": "utf8"
|
||||
},
|
||||
"path_pattern": "*.csv",
|
||||
"schema": "{}"
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
{
|
||||
"dataset": "test",
|
||||
"provider": {
|
||||
"storage": "S3",
|
||||
"bucket": "test-bucket",
|
||||
"aws_access_key_id": "123456",
|
||||
"aws_secret_access_key": "123456key",
|
||||
"path_prefix": "",
|
||||
"endpoint": "http://<local_ip>:9000"
|
||||
},
|
||||
"format": {
|
||||
"filetype": "csv",
|
||||
"delimiter": ",",
|
||||
"quote_char": "'",
|
||||
"encoding": "utf8"
|
||||
},
|
||||
"path_pattern": "*.csv",
|
||||
"schema": "{}"
|
||||
}
|
||||
@@ -2,101 +2,15 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable, List, Mapping
|
||||
from zipfile import ZipFile
|
||||
from typing import Any
|
||||
|
||||
import docker
|
||||
import pytest
|
||||
import requests # type: ignore[import]
|
||||
from airbyte_cdk import AirbyteLogger
|
||||
from docker.errors import APIError
|
||||
from netifaces import AF_INET, ifaddresses, interfaces
|
||||
from requests.exceptions import ConnectionError # type: ignore[import]
|
||||
|
||||
from .integration_test import TMP_FOLDER, TestIncrementalFileStreamS3
|
||||
from .integration_test import TestIncrementalFileStreamS3
|
||||
|
||||
LOGGER = AirbyteLogger()
|
||||
|
||||
|
||||
def get_local_ip() -> str:
|
||||
all_interface_ips: List[str] = []
|
||||
for iface_name in interfaces():
|
||||
all_interface_ips += [i["addr"] for i in ifaddresses(iface_name).setdefault(AF_INET, [{"addr": None}]) if i["addr"]]
|
||||
LOGGER.info(f"detected interface IPs: {all_interface_ips}")
|
||||
for ip in sorted(all_interface_ips):
|
||||
if not ip.startswith("127."):
|
||||
return ip
|
||||
|
||||
assert False, "not found an non-localhost interface"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def minio_credentials() -> Mapping[str, Any]:
|
||||
config_template = Path(__file__).parent / "config_minio.template.json"
|
||||
assert config_template.is_file() is not None, f"not found {config_template}"
|
||||
config_file = Path(__file__).parent / "config_minio.json"
|
||||
config_file.write_text(config_template.read_text().replace("<local_ip>", get_local_ip()))
|
||||
credentials = {}
|
||||
with open(str(config_file)) as f:
|
||||
credentials = json.load(f)
|
||||
return credentials
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def minio_setup(minio_credentials: Mapping[str, Any]) -> Iterable[None]:
|
||||
|
||||
with ZipFile("./integration_tests/minio_data.zip") as archive:
|
||||
archive.extractall(TMP_FOLDER)
|
||||
client = docker.from_env()
|
||||
# Minio should be attached to non-localhost interface.
|
||||
# Because another test container should have direct connection to it
|
||||
local_ip = get_local_ip()
|
||||
LOGGER.debug(f"minio settings: {minio_credentials}")
|
||||
try:
|
||||
container = client.containers.run(
|
||||
image="minio/minio:RELEASE.2021-10-06T23-36-31Z",
|
||||
command=f"server {TMP_FOLDER}",
|
||||
name="ci_test_minio",
|
||||
auto_remove=True,
|
||||
volumes=[f"/{TMP_FOLDER}/minio_data:/{TMP_FOLDER}"],
|
||||
detach=True,
|
||||
ports={"9000/tcp": (local_ip, 9000)},
|
||||
)
|
||||
except APIError as err:
|
||||
if err.status_code == 409:
|
||||
for container in client.containers.list():
|
||||
if container.name == "ci_test_minio":
|
||||
LOGGER.info("minio was started before")
|
||||
break
|
||||
else:
|
||||
raise
|
||||
|
||||
check_url = f"http://{local_ip}:9000/minio/health/live"
|
||||
checked = False
|
||||
for _ in range(120): # wait 1 min
|
||||
time.sleep(0.5)
|
||||
LOGGER.info(f"try to connect to {check_url}")
|
||||
try:
|
||||
data = requests.get(check_url)
|
||||
except ConnectionError as err:
|
||||
LOGGER.warn(f"minio error: {err}")
|
||||
continue
|
||||
if data.status_code == 200:
|
||||
checked = True
|
||||
LOGGER.info("Run a minio/minio container...")
|
||||
break
|
||||
else:
|
||||
LOGGER.info(f"minio error: {data.response.text}")
|
||||
if not checked:
|
||||
assert False, "couldn't connect to minio!!!"
|
||||
|
||||
yield
|
||||
# this minio container was not finished because it is needed for all integration adn acceptance tests
|
||||
|
||||
|
||||
def pytest_sessionfinish(session: Any, exitstatus: Any) -> None:
|
||||
"""tries to find and remove all temp buckets"""
|
||||
instance = TestIncrementalFileStreamS3()
|
||||
|
||||
@@ -4,27 +4,16 @@
|
||||
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from typing import Any, Dict, Iterator, List, Mapping
|
||||
from typing import Iterator, List, Mapping
|
||||
|
||||
import boto3
|
||||
import pytest
|
||||
from airbyte_cdk import AirbyteLogger
|
||||
from botocore.errorfactory import ClientError
|
||||
from source_s3.source import SourceS3
|
||||
from source_s3.stream import IncrementalFileStreamS3
|
||||
from unit_tests.abstract_test_parser import memory_limit
|
||||
from unit_tests.test_csv_parser import generate_big_file
|
||||
|
||||
from .integration_test_abstract import HERE, SAMPLE_DIR, AbstractTestIncrementalFileStream
|
||||
|
||||
TMP_FOLDER = "/tmp/test_minio_source_s3"
|
||||
if not os.path.exists(TMP_FOLDER):
|
||||
os.makedirs(TMP_FOLDER)
|
||||
|
||||
LOGGER = AirbyteLogger()
|
||||
|
||||
|
||||
@@ -73,9 +62,9 @@ class TestIncrementalFileStreamS3(AbstractTestIncrementalFileStream):
|
||||
if private:
|
||||
self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)
|
||||
else:
|
||||
self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location, ObjectOwnership='ObjectWriter')
|
||||
self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location, ObjectOwnership="ObjectWriter")
|
||||
self.s3_client.delete_public_access_block(Bucket=bucket_name)
|
||||
self.s3_client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
|
||||
self.s3_client.put_bucket_acl(Bucket=bucket_name, ACL="public-read")
|
||||
|
||||
# wait here until the bucket is ready
|
||||
ready = False
|
||||
@@ -107,36 +96,3 @@ class TestIncrementalFileStreamS3(AbstractTestIncrementalFileStream):
|
||||
bucket.objects.all().delete()
|
||||
bucket.delete()
|
||||
LOGGER.info(f"S3 Bucket {cloud_bucket_name} is now deleted")
|
||||
|
||||
|
||||
class TestIntegrationCsvFiles:
|
||||
logger = logging.getLogger("airbyte")
|
||||
|
||||
@memory_limit(150) # max used memory should be less than 150Mb
|
||||
def read_source(self, credentials: Dict[str, Any], catalog: Dict[str, Any]) -> int:
|
||||
read_count = 0
|
||||
for msg in SourceS3().read(logger=self.logger, config=credentials, catalog=catalog):
|
||||
if msg.record:
|
||||
read_count += 1
|
||||
return read_count
|
||||
|
||||
@pytest.mark.order(1)
|
||||
def test_big_file(self, minio_credentials: Dict[str, Any]) -> None:
|
||||
"""tests a big csv file (>= 1.0G records)"""
|
||||
# generates a big CSV files separately
|
||||
big_file_folder = os.path.join(TMP_FOLDER, "minio_data", "test-bucket", "big_files")
|
||||
shutil.rmtree(big_file_folder, ignore_errors=True)
|
||||
os.makedirs(big_file_folder)
|
||||
filepath = os.path.join(big_file_folder, "file.csv")
|
||||
|
||||
# please change this value if you need to test another file size
|
||||
future_file_size = 0.5 # in gigabytes
|
||||
_, file_size = generate_big_file(filepath, future_file_size, 500)
|
||||
expected_count = sum(1 for _ in open(filepath)) - 1
|
||||
self.logger.info(f"generated file {filepath} with size {file_size}Gb, lines: {expected_count}")
|
||||
|
||||
minio_credentials["path_pattern"] = "big_files/file.csv"
|
||||
minio_credentials["format"]["block_size"] = 5 * 1024**2
|
||||
source = SourceS3()
|
||||
catalog = source.read_catalog(HERE / "configured_catalogs/csv.json")
|
||||
assert self.read_source(minio_credentials, catalog) == expected_count
|
||||
|
||||
Reference in New Issue
Block a user