* New file-based CDK module scaffolding * Address code review comments * Formatting * Automated Commit - Formatting Changes * Apply suggestions from code review Co-authored-by: Sherif A. Nada <snadalive@gmail.com> Co-authored-by: Alexandre Girard <alexandre@airbyte.io> * Automated Commit - Formatting Changes * address CR comments * Update tests to use builder pattern * Move max files for schema inference onto the discovery policy * Reorganize stream & its dependencies * File CDK: error handling for CSV parser (#27176) * file url and updated_at timestamp is added to state's history field * Address CR comments * Address CR comments * Use stream_slice to determine which files to sync * fix * test with no input state * test with multiple files * filter out older files * group by timestamp * Add another test * comment * use min time * skip files that are already in the history * move the code around * include files that are not in the history * remove start_timestamp * cleanup * sync misisng recent files even if history is more recent * remove old files if history is full * resync files if history is incomplete * sync recent files * comment * configurable history size * configurable days to sync if history is full * move to a stateful object * Only update state once per file * two unit tests * Unit tests * missing files * remove inner state * fix tests * fix interface * fix constructor * Update interface * cleanup * format * Update * cleanup * Add timestamp and source file to schema * set file uri on record * format * comment * reset * notes * delete dead code * format * remove dead code * remove dead code * warning if history is not complete * always set is_history_partial in the state * rename * Add a readme * format * Update * rename * rename * missing files * get instead of compute * sort alphabetically, and sync everthing if the history is not partial * unit tests * Update airbyte-cdk/python/airbyte_cdk/sources/file_based/README.md Co-authored-by: Catherine Noll <clnoll@users.noreply.github.com> * Update docs * reset * Test to verify we remove files sorted (datetime, alphabetically) * comment * Update scenario * Rename method to get_state * If the file's ts is equal to the earliest ts, only sync it if its alphabetically greater than the file * add missing test * rename * rename and update comments * Update comment for clarity * inject the cursor * add interface * comment * Handle the case where the file has been modified since it was synced * Only inject from AbstractFileSource * keep the remote files in the stream slices * Use file_based typedefs * format * Update the comment * simplify the logic, update comment, and add a test * Add a comment * slightly cleaner * clean up * typing * comment * I think this is simpler to reason about * create the cursor in the source * update * Remove methods from FiledBasedStreamReader and AbstractFileBasedStream interface (#27736) * update the interface * Add a comment * rename --------- Co-authored-by: Catherine Noll <noll.catherine@gmail.com> Co-authored-by: clnoll <clnoll@users.noreply.github.com> Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
162 lines
6.0 KiB
Python
162 lines
6.0 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Union
|
|
|
|
import pytest
|
|
from airbyte_cdk.entrypoint import launch
|
|
from airbyte_cdk.models.airbyte_protocol import SyncMode
|
|
from freezegun import freeze_time
|
|
from unit_tests.sources.file_based.scenarios.csv_scenarios import (
|
|
csv_multi_stream_scenario,
|
|
csv_single_stream_scenario,
|
|
invalid_csv_scenario,
|
|
multi_csv_scenario,
|
|
multi_csv_stream_n_file_exceeds_limit_for_inference,
|
|
single_csv_scenario,
|
|
)
|
|
from unit_tests.sources.file_based.scenarios.incremental_scenarios import (
|
|
multi_csv_different_timestamps_scenario,
|
|
multi_csv_include_missing_files_within_history_range,
|
|
multi_csv_per_timestamp_scenario,
|
|
multi_csv_remove_old_files_if_history_is_full_scenario,
|
|
multi_csv_same_timestamp_more_files_than_history_size_scenario,
|
|
multi_csv_same_timestamp_scenario,
|
|
multi_csv_skip_file_if_already_in_history,
|
|
multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario,
|
|
multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_timestamps_scenario,
|
|
multi_csv_sync_recent_files_if_history_is_incomplete_scenario,
|
|
single_csv_file_is_skipped_if_same_modified_at_as_in_history,
|
|
single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history,
|
|
single_csv_input_state_is_earlier_scenario,
|
|
single_csv_input_state_is_later_scenario,
|
|
single_csv_no_input_state_scenario,
|
|
)
|
|
|
|
scenarios = [
|
|
csv_multi_stream_scenario,
|
|
csv_single_stream_scenario,
|
|
invalid_csv_scenario,
|
|
single_csv_scenario,
|
|
multi_csv_scenario,
|
|
multi_csv_stream_n_file_exceeds_limit_for_inference,
|
|
single_csv_input_state_is_earlier_scenario,
|
|
single_csv_no_input_state_scenario,
|
|
single_csv_input_state_is_later_scenario,
|
|
multi_csv_same_timestamp_scenario,
|
|
multi_csv_different_timestamps_scenario,
|
|
multi_csv_per_timestamp_scenario,
|
|
multi_csv_skip_file_if_already_in_history,
|
|
multi_csv_include_missing_files_within_history_range,
|
|
multi_csv_remove_old_files_if_history_is_full_scenario,
|
|
multi_csv_same_timestamp_more_files_than_history_size_scenario,
|
|
multi_csv_sync_recent_files_if_history_is_incomplete_scenario,
|
|
multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_timestamps_scenario,
|
|
multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario,
|
|
single_csv_file_is_skipped_if_same_modified_at_as_in_history,
|
|
single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history,
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize("scenario", scenarios, ids=[s.name for s in scenarios])
|
|
def test_discover(capsys, tmp_path, json_spec, scenario):
|
|
if scenario.expected_discover_error:
|
|
with pytest.raises(scenario.expected_discover_error):
|
|
discover(capsys, tmp_path, scenario)
|
|
else:
|
|
assert discover(capsys, tmp_path, scenario) == scenario.expected_catalog
|
|
|
|
|
|
@pytest.mark.parametrize("scenario", scenarios, ids=[s.name for s in scenarios])
|
|
@freeze_time("2023-06-09T00:00:00Z")
|
|
def test_read(capsys, tmp_path, json_spec, scenario):
|
|
if scenario.incremental_scenario_config:
|
|
run_test_read_incremental(capsys, tmp_path, scenario)
|
|
else:
|
|
run_test_read_full_refresh(capsys, tmp_path, scenario)
|
|
|
|
|
|
def run_test_read_full_refresh(capsys, tmp_path, scenario):
|
|
if scenario.expected_read_error:
|
|
with pytest.raises(scenario.expected_read_error):
|
|
read(capsys, tmp_path, scenario)
|
|
else:
|
|
output = read(capsys, tmp_path, scenario)
|
|
expected_output = scenario.expected_records
|
|
assert len(output) == len(expected_output)
|
|
for actual, expected in zip(output, expected_output):
|
|
assert actual["record"]["data"] == expected["data"]
|
|
assert actual["record"]["stream"] == expected["stream"]
|
|
|
|
|
|
def run_test_read_incremental(capsys, tmp_path, scenario):
|
|
if scenario.expected_read_error:
|
|
with pytest.raises(scenario.expected_read_error):
|
|
read_with_state(capsys, tmp_path, scenario)
|
|
else:
|
|
output = read_with_state(capsys, tmp_path, scenario)
|
|
expected_output = scenario.expected_records
|
|
assert len(output) == len(expected_output)
|
|
for actual, expected in zip(output, expected_output):
|
|
if "record" in actual:
|
|
assert actual["record"]["data"] == expected
|
|
elif "state" in actual:
|
|
assert actual["state"]["data"] == expected
|
|
|
|
|
|
def discover(capsys, tmp_path, scenario) -> Dict[str, Any]:
|
|
launch(
|
|
scenario.source,
|
|
["discover", "--config", make_file(tmp_path / "config.json", scenario.config)],
|
|
)
|
|
captured = capsys.readouterr()
|
|
return json.loads(captured.out.splitlines()[0])["catalog"]
|
|
|
|
|
|
def read(capsys, tmp_path, scenario):
|
|
launch(
|
|
scenario.source,
|
|
[
|
|
"read",
|
|
"--config",
|
|
make_file(tmp_path / "config.json", scenario.config),
|
|
"--catalog",
|
|
make_file(tmp_path / "catalog.json", scenario.configured_catalog(SyncMode.full_refresh)),
|
|
],
|
|
)
|
|
captured = capsys.readouterr()
|
|
return [
|
|
msg
|
|
for msg in (json.loads(line) for line in captured.out.splitlines())
|
|
if msg["type"] == "RECORD"
|
|
]
|
|
|
|
|
|
def read_with_state(capsys, tmp_path, scenario):
|
|
launch(
|
|
scenario.source,
|
|
[
|
|
"read",
|
|
"--config",
|
|
make_file(tmp_path / "config.json", scenario.config),
|
|
"--catalog",
|
|
make_file(tmp_path / "catalog.json", scenario.configured_catalog(SyncMode.incremental)),
|
|
"--state",
|
|
make_file(tmp_path / "state.json", scenario.input_state()),
|
|
],
|
|
)
|
|
captured = capsys.readouterr()
|
|
return [
|
|
msg
|
|
for msg in (json.loads(line) for line in captured.out.splitlines())
|
|
if msg["type"] in ("RECORD", "STATE")
|
|
]
|
|
|
|
|
|
def make_file(path: Path, file_contents: Union[Dict, List]) -> str:
|
|
path.write_text(json.dumps(file_contents))
|
|
return str(path)
|