🐛 Source S3: fixed bug where sync could hang indefinitely (#5197)
* infer schema in multi process * use dill to pickle function * moved funcs * Revert "moved funcs" This reverts commitc1739ad988. * Revert "use dill to pickle function" This reverts commit52404a9f1b. * Revert "infer schema in multi process" This reverts commitf0fb6f66f9. * multiprocess in csv schema iinfer * simplify what happens in the multiprocess to offending code * try this * using tempfile * formatting * version bump * changelog + formatting * addressed review comments * re-trigger checks * ran testScaffoldTemplates to fix breaking check
This commit is contained in:
@@ -12,5 +12,5 @@ RUN pip install .
|
||||
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
|
||||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
|
||||
|
||||
LABEL io.airbyte.version=0.1.2
|
||||
LABEL io.airbyte.version=0.1.3
|
||||
LABEL io.airbyte.name=airbyte/source-s3
|
||||
|
||||
@@ -78,6 +78,12 @@
|
||||
"default": false,
|
||||
"type": "boolean"
|
||||
},
|
||||
"block_size": {
|
||||
"title": "Block Size",
|
||||
"description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
|
||||
"default": 10000,
|
||||
"type": "integer"
|
||||
},
|
||||
"additional_reader_options": {
|
||||
"title": "Additional Reader Options",
|
||||
"description": "Optionally add a valid JSON string here to provide additional options to the csv reader. Mappings must correspond to options <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions\" target=\"_blank\">detailed here</a>. 'column_types' is used internally to handle schema so overriding that would likely cause problems.",
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
MAIN_REQUIREMENTS = ["airbyte-cdk", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2"]
|
||||
MAIN_REQUIREMENTS = ["airbyte-cdk", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2", "dill==0.3.4"]
|
||||
|
||||
TEST_REQUIREMENTS = [
|
||||
"pytest~=6.1",
|
||||
|
||||
@@ -23,14 +23,21 @@
|
||||
#
|
||||
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union
|
||||
from typing import Any, BinaryIO, Iterator, Mapping, Optional, TextIO, Tuple, Union
|
||||
|
||||
import dill
|
||||
import pyarrow as pa
|
||||
from airbyte_cdk.logger import AirbyteLogger
|
||||
from pyarrow import csv as pa_csv
|
||||
|
||||
|
||||
def multiprocess_queuer(func, queue: mp.Queue, *args, **kwargs):
|
||||
""" this is our multiprocesser helper function, lives at top-level to be Windows-compatible """
|
||||
queue.put(dill.loads(func)(*args, **kwargs))
|
||||
|
||||
|
||||
class FileFormatParser(ABC):
|
||||
def __init__(self, format: dict, master_schema: dict = None):
|
||||
"""
|
||||
@@ -132,41 +139,124 @@ class CsvParser(FileFormatParser):
|
||||
def _read_options(self):
|
||||
"""
|
||||
https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html
|
||||
|
||||
build ReadOptions object like: pa.csv.ReadOptions(**self._read_options())
|
||||
"""
|
||||
return pa.csv.ReadOptions(block_size=10000, encoding=self._format.get("encoding", "utf8"))
|
||||
return {"block_size": self._format.get("block_size", 10000), "encoding": self._format.get("encoding", "utf8")}
|
||||
|
||||
def _parse_options(self):
|
||||
"""
|
||||
https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html
|
||||
|
||||
build ParseOptions object like: pa.csv.ParseOptions(**self._parse_options())
|
||||
"""
|
||||
quote_char = self._format.get("quote_char", False) if self._format.get("quote_char", False) != "" else False
|
||||
return pa.csv.ParseOptions(
|
||||
delimiter=self._format.get("delimiter", ","),
|
||||
quote_char=quote_char,
|
||||
double_quote=self._format.get("double_quote", True),
|
||||
escape_char=self._format.get("escape_char", False),
|
||||
newlines_in_values=self._format.get("newlines_in_values", False),
|
||||
)
|
||||
return {
|
||||
"delimiter": self._format.get("delimiter", ","),
|
||||
"quote_char": quote_char,
|
||||
"double_quote": self._format.get("double_quote", True),
|
||||
"escape_char": self._format.get("escape_char", False),
|
||||
"newlines_in_values": self._format.get("newlines_in_values", False),
|
||||
}
|
||||
|
||||
def _convert_options(self, json_schema: Mapping[str, Any] = None):
|
||||
"""
|
||||
https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html
|
||||
|
||||
build ConvertOptions object like: pa.csv.ConvertOptions(**self._convert_options())
|
||||
|
||||
:param json_schema: if this is passed in, pyarrow will attempt to enforce this schema on read, defaults to None
|
||||
"""
|
||||
check_utf8 = True if self._format.get("encoding", "utf8").lower().replace("-", "") == "utf8" else False
|
||||
convert_schema = self.json_schema_to_pyarrow_schema(json_schema) if json_schema is not None else None
|
||||
return pa.csv.ConvertOptions(
|
||||
check_utf8=check_utf8, column_types=convert_schema, **json.loads(self._format.get("additional_reader_options", "{}"))
|
||||
)
|
||||
return {
|
||||
**{"check_utf8": check_utf8, "column_types": convert_schema},
|
||||
**json.loads(self._format.get("additional_reader_options", "{}")),
|
||||
}
|
||||
|
||||
def _run_in_external_process(self, fn, timeout: int, max_timeout: int, *args) -> Any:
|
||||
"""
|
||||
fn passed in must return a tuple of (desired return value, Exception OR None)
|
||||
This allows propagating any errors from the process up and raising accordingly
|
||||
|
||||
"""
|
||||
result = None
|
||||
while result is None:
|
||||
q_worker = mp.Queue()
|
||||
proc = mp.Process(
|
||||
target=multiprocess_queuer,
|
||||
args=(dill.dumps(fn), q_worker, *args), # use dill to pickle the function for Windows-compatibility
|
||||
)
|
||||
proc.start()
|
||||
try:
|
||||
# this attempts to get return value from function with our specified timeout up to max
|
||||
result, potential_error = q_worker.get(timeout=min(timeout, max_timeout))
|
||||
except mp.queues.Empty:
|
||||
if timeout >= max_timeout: # if we've got to max_timeout and tried once with that value
|
||||
raise TimeoutError(
|
||||
f"Timed out too many times while running {fn.__name__}, max timeout of {max_timeout} seconds reached."
|
||||
)
|
||||
self.logger.info(f"timed out while running {fn.__name__} after {timeout} seconds, retrying...")
|
||||
timeout *= 2 # double timeout and try again
|
||||
else:
|
||||
if potential_error is not None:
|
||||
raise potential_error
|
||||
else:
|
||||
return result
|
||||
finally:
|
||||
try:
|
||||
proc.terminate()
|
||||
except Exception as e:
|
||||
self.logger.info(f"'{fn.__name__}' proc unterminated, error: {e}")
|
||||
|
||||
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
|
||||
"""
|
||||
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
|
||||
Note: this reads just the first block (as defined in _read_options() block_size) to infer the schema
|
||||
|
||||
This now uses multiprocessing in order to timeout the schema inference as it can hang.
|
||||
Since the hanging code is resistant to signal interrupts, threading/futures doesn't help so needed to multiprocess.
|
||||
https://issues.apache.org/jira/browse/ARROW-11853?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
|
||||
"""
|
||||
streaming_reader = pa_csv.open_csv(file, self._read_options(), self._parse_options(), self._convert_options())
|
||||
schema_dict = {field.name: field.type for field in streaming_reader.schema}
|
||||
|
||||
def infer_schema_process(
|
||||
file_sample: str, read_opts: dict, parse_opts: dict, convert_opts: dict
|
||||
) -> Tuple[dict, Optional[Exception]]:
|
||||
"""
|
||||
we need to reimport here to be functional on Windows systems since it doesn't have fork()
|
||||
https://docs.python.org/3.7/library/multiprocessing.html#contexts-and-start-methods
|
||||
|
||||
This returns a tuple of (schema_dict, None OR Exception).
|
||||
If return[1] is not None and holds an exception we then raise this in the main process.
|
||||
This lets us propagate up any errors (that aren't timeouts) and raise correctly.
|
||||
"""
|
||||
try:
|
||||
import tempfile
|
||||
|
||||
import pyarrow as pa
|
||||
|
||||
# writing our file_sample to a temporary file to then read in and schema infer as before
|
||||
with tempfile.TemporaryFile() as fp:
|
||||
fp.write(file_sample)
|
||||
fp.seek(0)
|
||||
streaming_reader = pa.csv.open_csv(
|
||||
fp, pa.csv.ReadOptions(**read_opts), pa.csv.ParseOptions(**parse_opts), pa.csv.ConvertOptions(**convert_opts)
|
||||
)
|
||||
schema_dict = {field.name: field.type for field in streaming_reader.schema}
|
||||
|
||||
except Exception as e:
|
||||
# we pass the traceback up otherwise the main process won't know the exact method+line of error
|
||||
return (None, e)
|
||||
else:
|
||||
return (schema_dict, None)
|
||||
|
||||
# boto3 objects can't be pickled (https://github.com/boto/boto3/issues/678)
|
||||
# and so we can't multiprocess with the actual fileobject on Windows systems
|
||||
# we're reading block_size*2 bytes here, which we can then pass in and infer schema from block_size bytes
|
||||
# the *2 is to give us a buffer as pyarrow figures out where lines actually end so it gets schema correct
|
||||
file_sample = file.read(self._read_options()["block_size"] * 2)
|
||||
schema_dict = self._run_in_external_process(
|
||||
infer_schema_process, 4, 60, file_sample, self._read_options(), self._parse_options(), self._convert_options()
|
||||
)
|
||||
return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True)
|
||||
|
||||
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
|
||||
@@ -174,7 +264,12 @@ class CsvParser(FileFormatParser):
|
||||
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
|
||||
PyArrow returns lists of values for each column so we zip() these up into records which we then yield
|
||||
"""
|
||||
streaming_reader = pa_csv.open_csv(file, self._read_options(), self._parse_options(), self._convert_options(self._master_schema))
|
||||
streaming_reader = pa_csv.open_csv(
|
||||
file,
|
||||
pa.csv.ReadOptions(**self._read_options()),
|
||||
pa.csv.ParseOptions(**self._parse_options()),
|
||||
pa.csv.ConvertOptions(**self._convert_options(self._master_schema)),
|
||||
)
|
||||
still_reading = True
|
||||
while still_reading:
|
||||
try:
|
||||
|
||||
@@ -94,6 +94,10 @@ class CsvFormat(BaseModel):
|
||||
default=False,
|
||||
description="Whether newline characters are allowed in CSV values. Turning this on may affect performance. Leave blank to default to False.",
|
||||
)
|
||||
block_size: int = Field(
|
||||
default=10000,
|
||||
description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
|
||||
)
|
||||
additional_reader_options: str = Field(
|
||||
default="{}",
|
||||
description='Optionally add a valid JSON string here to provide additional options to the csv reader. Mappings must correspond to options <a href="https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html#pyarrow.csv.ConvertOptions" target="_blank">detailed here</a>. \'column_types\' is used internally to handle schema so overriding that would likely cause problems.',
|
||||
|
||||
Reference in New Issue
Block a user