Files
dify/api/tests/unit_tests/tasks/test_clean_document_task.py

292 lines
11 KiB
Python

"""
Unit tests for clean_document_task.
Focuses on the resilience contract added by the billing-failure fix:
``index_processor.clean()`` is wrapped in ``try/except`` so that a transient
failure inside the vector / keyword cleanup (e.g. ``ValueError("Unable to
retrieve billing information...")`` raised by ``BillingService._send_request``
when ``Vector(dataset)`` transitively triggers ``FeatureService.get_features``)
does not abort the entire task and leave PG with stranded ``DocumentSegment``
/ ``ChildChunk`` / ``UploadFile`` / ``DatasetMetadataBinding`` rows.
"""
import uuid
from unittest.mock import MagicMock, patch
import pytest
from tasks.clean_document_task import clean_document_task
@pytest.fixture
def document_id():
return str(uuid.uuid4())
@pytest.fixture
def dataset_id():
return str(uuid.uuid4())
@pytest.fixture
def tenant_id():
return str(uuid.uuid4())
@pytest.fixture
def mock_session_factory():
"""Patch ``session_factory.create_session`` to return per-call mock sessions.
Each call to ``create_session()`` yields a fresh ``MagicMock`` session so we
can assert ``execute()`` calls across the multiple short-lived transactions
used by ``clean_document_task``.
"""
with patch("tasks.clean_document_task.session_factory", autospec=True) as mock_sf:
sessions: list[MagicMock] = []
def _create_session():
session = MagicMock()
session.scalars.return_value.all.return_value = []
session.execute.return_value.all.return_value = []
session.scalar.return_value = None
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = None
sessions.append(session)
return cm
mock_sf.create_session.side_effect = _create_session
yield mock_sf, sessions
@pytest.fixture
def mock_storage():
with patch("tasks.clean_document_task.storage", autospec=True) as mock:
mock.delete.return_value = None
yield mock
@pytest.fixture
def mock_index_processor_factory():
"""Mock ``IndexProcessorFactory`` so we can inject behavior into ``clean``."""
with patch("tasks.clean_document_task.IndexProcessorFactory", autospec=True) as factory_cls:
processor = MagicMock()
processor.clean.return_value = None
factory_instance = MagicMock()
factory_instance.init_index_processor.return_value = processor
factory_cls.return_value = factory_instance
yield {
"factory_cls": factory_cls,
"factory_instance": factory_instance,
"processor": processor,
}
def _build_segment(segment_id: str, content: str = "segment content") -> MagicMock:
seg = MagicMock()
seg.id = segment_id
seg.index_node_id = f"node-{segment_id}"
seg.content = content
return seg
def _build_dataset(dataset_id: str, tenant_id: str) -> MagicMock:
ds = MagicMock()
ds.id = dataset_id
ds.tenant_id = tenant_id
return ds
class TestVectorCleanupResilience:
"""Vector / keyword cleanup must not abort the task on transient failure."""
def test_billing_failure_during_vector_cleanup_does_not_skip_pg_cleanup(
self,
document_id,
dataset_id,
tenant_id,
mock_session_factory,
mock_storage,
mock_index_processor_factory,
):
"""Reproduces the production incident:
``Vector(dataset)`` transitively calls ``FeatureService.get_features``
which calls ``BillingService._send_request("GET", ...)``. When billing
returns non-200 it raises ``ValueError("Unable to retrieve billing
information...")``. Before the fix this propagated out of
``clean_document_task`` and left ``DocumentSegment`` / ``ChildChunk`` /
``UploadFile`` / ``DatasetMetadataBinding`` rows orphaned because the
already-deleted ``Document`` row had been hard-committed by the caller
(``dataset_service.delete_document``) before ``.delay()`` was invoked.
Contract: a billing failure inside ``index_processor.clean()`` must be
caught, logged, and the rest of the task must continue so PG ends up
consistent with the deleted ``Document`` even if Qdrant retains
orphan vectors that can be reaped later.
"""
mock_sf, sessions = mock_session_factory
# First create_session(): Step 1 (load segments + attachments).
step1_session = MagicMock()
step1_session.scalars.return_value.all.return_value = [
_build_segment("seg-1"),
_build_segment("seg-2"),
]
step1_session.execute.return_value.all.return_value = []
step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
# Second create_session(): Step 2 (vector cleanup). Returns dataset.
step2_session = MagicMock()
step2_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
step2_session.scalars.return_value.all.return_value = []
step2_session.execute.return_value.all.return_value = []
# Subsequent sessions: Step 3+ (image / segment / file / metadata cleanup).
# Default fixture returns empty results which is fine for these short txns.
cm1, cm2 = MagicMock(), MagicMock()
cm1.__enter__.return_value = step1_session
cm1.__exit__.return_value = None
cm2.__enter__.return_value = step2_session
cm2.__exit__.return_value = None
def _default_cm():
session = MagicMock()
session.scalars.return_value.all.return_value = []
session.execute.return_value.all.return_value = []
session.scalar.return_value = None
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = None
sessions.append(session)
return cm
mock_sf.create_session.side_effect = [cm1, cm2] + [_default_cm() for _ in range(10)]
# Simulate the production failure: index_processor.clean() raises ValueError
# mirroring BillingService._send_request when billing returns non-200.
mock_index_processor_factory["processor"].clean.side_effect = ValueError(
"Unable to retrieve billing information. Please try again later or contact support."
)
# Act — must not raise out of the task even though clean() raises.
clean_document_task(
document_id=document_id,
dataset_id=dataset_id,
doc_form="paragraph",
file_id=None,
)
# Assert
# 1. Vector cleanup was attempted.
mock_index_processor_factory["processor"].clean.assert_called_once()
# 2. Despite the failure the task continued: at least one DocumentSegment
# delete was issued. We use the count of session.execute calls across
# later short transactions as a proxy for "Step 3+ executed".
execute_calls = sum(s.execute.call_count for s in sessions)
assert execute_calls > 0, (
"Step 3+ DB cleanup did not run after vector cleanup failure; "
"this regression would re-introduce the orphan-segment bug."
)
def test_vector_cleanup_success_path_remains_unaffected(
self,
document_id,
dataset_id,
tenant_id,
mock_session_factory,
mock_storage,
mock_index_processor_factory,
):
"""Backward-compat: the happy path must still call ``clean()`` exactly
once with the expected arguments and complete without errors.
"""
mock_sf, sessions = mock_session_factory
step1_session = MagicMock()
step1_session.scalars.return_value.all.return_value = [_build_segment("seg-1")]
step1_session.execute.return_value.all.return_value = []
step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
step2_session = MagicMock()
step2_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
step2_session.scalars.return_value.all.return_value = []
step2_session.execute.return_value.all.return_value = []
cm1, cm2 = MagicMock(), MagicMock()
cm1.__enter__.return_value = step1_session
cm1.__exit__.return_value = None
cm2.__enter__.return_value = step2_session
cm2.__exit__.return_value = None
def _default_cm():
session = MagicMock()
session.scalars.return_value.all.return_value = []
session.execute.return_value.all.return_value = []
session.scalar.return_value = None
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = None
sessions.append(session)
return cm
mock_sf.create_session.side_effect = [cm1, cm2] + [_default_cm() for _ in range(10)]
clean_document_task(
document_id=document_id,
dataset_id=dataset_id,
doc_form="paragraph",
file_id=None,
)
assert mock_index_processor_factory["processor"].clean.call_count == 1
# Index cleanup invoked with the expected delete_summaries / delete_child_chunks flags.
_, kwargs = mock_index_processor_factory["processor"].clean.call_args
assert kwargs.get("with_keywords") is True
assert kwargs.get("delete_child_chunks") is True
assert kwargs.get("delete_summaries") is True
def test_no_segments_skips_vector_cleanup(
self,
document_id,
dataset_id,
tenant_id,
mock_session_factory,
mock_storage,
mock_index_processor_factory,
):
"""When the document has no segments (e.g. indexing failed before
producing any), vector cleanup must not be attempted — and therefore
the new try/except wrapper does not change behavior here.
"""
mock_sf, sessions = mock_session_factory
step1_session = MagicMock()
step1_session.scalars.return_value.all.return_value = [] # no segments
step1_session.execute.return_value.all.return_value = []
step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
cm1 = MagicMock()
cm1.__enter__.return_value = step1_session
cm1.__exit__.return_value = None
def _default_cm():
session = MagicMock()
session.scalars.return_value.all.return_value = []
session.execute.return_value.all.return_value = []
session.scalar.return_value = None
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = None
sessions.append(session)
return cm
mock_sf.create_session.side_effect = [cm1] + [_default_cm() for _ in range(10)]
clean_document_task(
document_id=document_id,
dataset_id=dataset_id,
doc_form="paragraph",
file_id=None,
)
# Vector cleanup is gated on ``index_node_ids``; when there are no
# segments the IndexProcessorFactory path is never entered.
mock_index_processor_factory["factory_cls"].assert_not_called()