connectors-ci: re-enable publish tests (#29149)
This commit is contained in:
@@ -15,7 +15,17 @@ pytestmark = [
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Currently failing, should be fixed in the future")
|
||||
@pytest.fixture
|
||||
def publish_context(mocker, dagger_client, tmpdir):
|
||||
return mocker.MagicMock(
|
||||
dagger_client=dagger_client,
|
||||
get_connector_dir=mocker.MagicMock(return_value=dagger_client.host().directory(str(tmpdir))),
|
||||
docker_hub_username_secret=None,
|
||||
docker_hub_password_secret=None,
|
||||
docker_image="hello-world:latest",
|
||||
)
|
||||
|
||||
|
||||
class TestCheckConnectorImageDoesNotExists:
|
||||
@pytest.fixture(scope="class")
|
||||
def three_random_connectors_image_names(self, oss_registry: dict) -> List[str]:
|
||||
@@ -23,33 +33,28 @@ class TestCheckConnectorImageDoesNotExists:
|
||||
random.shuffle(connectors)
|
||||
return [f"{connector['dockerRepository']}:{connector['dockerImageTag']}" for connector in connectors[:3]]
|
||||
|
||||
async def test_run(self, mocker, dagger_client, three_random_connectors_image_names):
|
||||
"""We pick the first three connectors from the OSS registry and check that they are already published."""
|
||||
async def test_run_skipped_when_already_published(self, three_random_connectors_image_names, publish_context):
|
||||
"""We pick three random connectors from the OSS registry. They should be published. We check that the step is skipped."""
|
||||
for image_name in three_random_connectors_image_names:
|
||||
context = mocker.MagicMock(dagger_client=dagger_client, docker_image_name=image_name)
|
||||
step = publish.CheckConnectorImageDoesNotExist(context)
|
||||
publish_context.docker_image = image_name
|
||||
step = publish.CheckConnectorImageDoesNotExist(publish_context)
|
||||
step_result = await step.run()
|
||||
assert step_result.status == StepStatus.SKIPPED
|
||||
image_name = "airbyte/source-pokeapi:0.0.0"
|
||||
context = mocker.MagicMock(dagger_client=dagger_client, docker_image_name=image_name)
|
||||
step = publish.CheckConnectorImageDoesNotExist(context)
|
||||
|
||||
async def test_run_success_when_already_published(self, publish_context):
|
||||
|
||||
publish_context.docker_image = "airbyte/source-pokeapi:0.0.0"
|
||||
step = publish.CheckConnectorImageDoesNotExist(publish_context)
|
||||
step_result = await step.run()
|
||||
assert step_result.status == StepStatus.SUCCESS
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Currently failing, should be fixed in the future")
|
||||
class TestUploadSpecToCache:
|
||||
@pytest.fixture(scope="class")
|
||||
def random_connector(self, oss_registry):
|
||||
return random.choice(oss_registry["sources"] + oss_registry["destinations"])
|
||||
|
||||
@pytest.fixture
|
||||
def context(self, mocker, dagger_client, random_connector, tmpdir):
|
||||
image_name = f"{random_connector['dockerRepository']}:{random_connector['dockerImageTag']}"
|
||||
tmp_dir = dagger_client.host().directory(str(tmpdir))
|
||||
return mocker.MagicMock(
|
||||
dagger_client=dagger_client, get_connector_dir=mocker.MagicMock(return_value=tmp_dir), docker_image_name=image_name
|
||||
)
|
||||
def random_connector(self, oss_registry: dict) -> dict:
|
||||
connectors = oss_registry["sources"] + oss_registry["destinations"]
|
||||
random.shuffle(connectors)
|
||||
return connectors[0]
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"valid_spec, successful_upload",
|
||||
@@ -60,14 +65,15 @@ class TestUploadSpecToCache:
|
||||
[False, False],
|
||||
],
|
||||
)
|
||||
async def test_run(self, mocker, dagger_client, valid_spec, successful_upload, random_connector, context):
|
||||
async def test_run(self, mocker, dagger_client, valid_spec, successful_upload, random_connector, publish_context):
|
||||
"""Test that the spec is correctly uploaded to the spec cache bucket.
|
||||
We pick a random connector from the oss registry, by nature this connector should have a valid spec and be published.
|
||||
We use load this connector as a Dagger container and run spec against it.
|
||||
We load this connector as a Dagger container and run spec against it.
|
||||
We validate that the outputted spec is the same as the one in the OSS registry.
|
||||
We also artificially set the spec to be invalid and check that the step fails.
|
||||
"""
|
||||
image_name = f"{random_connector['dockerRepository']}:{random_connector['dockerImageTag']}"
|
||||
publish_context.docker_image = image_name
|
||||
expected_spec = random_connector["spec"]
|
||||
connector_container = dagger_client.container().from_(image_name)
|
||||
|
||||
@@ -80,15 +86,15 @@ class TestUploadSpecToCache:
|
||||
publish.UploadSpecToCache, "_get_connector_spec", mocker.Mock(side_effect=publish.InvalidSpecOutputError("Invalid spec."))
|
||||
)
|
||||
|
||||
step = publish.UploadSpecToCache(context)
|
||||
step = publish.UploadSpecToCache(publish_context)
|
||||
step_result = await step.run(connector_container)
|
||||
if valid_spec:
|
||||
publish.upload_to_gcs.assert_called_once_with(
|
||||
context.dagger_client,
|
||||
publish_context.dagger_client,
|
||||
mocker.ANY,
|
||||
f"specs/{image_name.replace(':', '/')}/spec.json",
|
||||
context.spec_cache_bucket_name,
|
||||
context.spec_cache_gcs_credentials_secret,
|
||||
publish_context.spec_cache_bucket_name,
|
||||
publish_context.spec_cache_gcs_credentials_secret,
|
||||
flags=['--cache-control="no-cache"'],
|
||||
)
|
||||
|
||||
@@ -110,27 +116,27 @@ class TestUploadSpecToCache:
|
||||
assert step_result.stdout is None
|
||||
publish.upload_to_gcs.assert_not_called()
|
||||
|
||||
def test_parse_spec_output_valid(self, context, random_connector):
|
||||
step = publish.UploadSpecToCache(context)
|
||||
def test_parse_spec_output_valid(self, publish_context, random_connector):
|
||||
step = publish.UploadSpecToCache(publish_context)
|
||||
correct_spec_message = json.dumps({"type": "SPEC", "spec": random_connector["spec"]})
|
||||
spec_output = f'random_stuff\n{{"type": "RANDOM_MESSAGE"}}\n{correct_spec_message}'
|
||||
result = step._parse_spec_output(spec_output)
|
||||
assert json.loads(result) == random_connector["spec"]
|
||||
|
||||
def test_parse_spec_output_invalid_json(self, context):
|
||||
step = publish.UploadSpecToCache(context)
|
||||
def test_parse_spec_output_invalid_json(self, publish_context):
|
||||
step = publish.UploadSpecToCache(publish_context)
|
||||
spec_output = "Invalid JSON"
|
||||
with pytest.raises(publish.InvalidSpecOutputError):
|
||||
step._parse_spec_output(spec_output)
|
||||
|
||||
def test_parse_spec_output_invalid_key(self, context):
|
||||
step = publish.UploadSpecToCache(context)
|
||||
def test_parse_spec_output_invalid_key(self, publish_context):
|
||||
step = publish.UploadSpecToCache(publish_context)
|
||||
spec_output = '{"type": "SPEC", "spec": {"invalid_key": "value"}}'
|
||||
with pytest.raises(publish.InvalidSpecOutputError):
|
||||
step._parse_spec_output(spec_output)
|
||||
|
||||
def test_parse_spec_output_no_spec(self, context):
|
||||
step = publish.UploadSpecToCache(context)
|
||||
def test_parse_spec_output_no_spec(self, publish_context):
|
||||
step = publish.UploadSpecToCache(publish_context)
|
||||
spec_output = '{"type": "OTHER"}'
|
||||
with pytest.raises(publish.InvalidSpecOutputError):
|
||||
step._parse_spec_output(spec_output)
|
||||
@@ -175,12 +181,11 @@ async def test_run_connector_publish_pipeline_when_failed_validation(mocker, pre
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Currently failing, should be fixed in the future")
|
||||
@pytest.mark.parametrize(
|
||||
"check_image_exists_status, pre_release",
|
||||
[(StepStatus.SKIPPED, False), (StepStatus.SKIPPED, True), (StepStatus.FAILURE, True), (StepStatus.FAILURE, False)],
|
||||
"check_image_exists_status",
|
||||
[StepStatus.SKIPPED, StepStatus.FAILURE],
|
||||
)
|
||||
async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker, check_image_exists_status, pre_release):
|
||||
async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker, check_image_exists_status, publish_context):
|
||||
"""We validate that when the connector image exists or the check fails, we don't run the rest of the pipeline.
|
||||
We also validate that the metadata upload step is called when the image exists (Skipped status).
|
||||
We do this to ensure that the metadata is still updated in the case where the connector image already exists.
|
||||
@@ -202,9 +207,8 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
|
||||
|
||||
run_metadata_upload = publish.metadata.MetadataUpload.return_value.run
|
||||
|
||||
context = mocker.MagicMock(pre_release=pre_release)
|
||||
semaphore = anyio.Semaphore(1)
|
||||
report = await publish.run_connector_publish_pipeline(context, semaphore)
|
||||
report = await publish.run_connector_publish_pipeline(publish_context, semaphore)
|
||||
run_metadata_validation.assert_called_once()
|
||||
run_check_connector_image_does_not_exist.assert_called_once()
|
||||
|
||||
@@ -213,22 +217,11 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
|
||||
if to_mock not in ["MetadataValidation", "MetadataUpload", "CheckConnectorImageDoesNotExist", "UploadSpecToCache"]:
|
||||
getattr(module, to_mock).return_value.run.assert_not_called()
|
||||
|
||||
if check_image_exists_status is StepStatus.SKIPPED and pre_release:
|
||||
run_metadata_upload.assert_not_called()
|
||||
assert (
|
||||
report.steps_results
|
||||
== context.report.steps_results
|
||||
== [
|
||||
run_metadata_validation.return_value,
|
||||
run_check_connector_image_does_not_exist.return_value,
|
||||
]
|
||||
)
|
||||
|
||||
if check_image_exists_status is StepStatus.SKIPPED and not pre_release:
|
||||
if check_image_exists_status is StepStatus.SKIPPED:
|
||||
run_metadata_upload.assert_called_once()
|
||||
assert (
|
||||
report.steps_results
|
||||
== context.report.steps_results
|
||||
== publish_context.report.steps_results
|
||||
== [
|
||||
run_metadata_validation.return_value,
|
||||
run_check_connector_image_does_not_exist.return_value,
|
||||
@@ -241,7 +234,7 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
|
||||
run_metadata_upload.assert_not_called()
|
||||
assert (
|
||||
report.steps_results
|
||||
== context.report.steps_results
|
||||
== publish_context.report.steps_results
|
||||
== [
|
||||
run_metadata_validation.return_value,
|
||||
run_check_connector_image_does_not_exist.return_value,
|
||||
@@ -249,7 +242,6 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Currently failing, should be fixed in the future")
|
||||
@pytest.mark.parametrize(
|
||||
"pre_release, build_step_status, push_step_status, pull_step_status, upload_to_spec_cache_step_status, metadata_upload_step_status",
|
||||
[
|
||||
@@ -318,15 +310,12 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
|
||||
publish.PullConnectorImageFromRegistry.return_value.run,
|
||||
]
|
||||
|
||||
if not pre_release:
|
||||
steps_to_run += [publish.metadata.MetadataUpload.return_value.run]
|
||||
|
||||
for i, step_to_run in enumerate(steps_to_run):
|
||||
if step_to_run.return_value.status is StepStatus.FAILURE or i == len(steps_to_run) - 1:
|
||||
assert len(report.steps_results) == len(context.report.steps_results)
|
||||
|
||||
previous_steps = steps_to_run[:i]
|
||||
for k, step_ran in enumerate(previous_steps):
|
||||
for _, step_ran in enumerate(previous_steps):
|
||||
step_ran.assert_called_once()
|
||||
step_ran.return_value
|
||||
|
||||
@@ -341,6 +330,3 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
|
||||
publish.PullConnectorImageFromRegistry.return_value.run.assert_not_called()
|
||||
publish.UploadSpecToCache.return_value.run.assert_not_called()
|
||||
publish.metadata.MetadataUpload.return_value.run.assert_not_called()
|
||||
|
||||
if pre_release:
|
||||
publish.metadata.MetadataUpload.return_value.run.assert_not_called()
|
||||
|
||||
Reference in New Issue
Block a user