diff --git a/api/fields/workflow_app_log_fields.py b/api/fields/workflow_app_log_fields.py index d0e762f62b..195b720285 100644 --- a/api/fields/workflow_app_log_fields.py +++ b/api/fields/workflow_app_log_fields.py @@ -14,6 +14,7 @@ workflow_app_log_partial_fields = { "id": fields.String, "workflow_run": fields.Nested(workflow_run_for_log_fields, attribute="workflow_run", allow_null=True), "details": fields.Raw(attribute="details"), + "evaluation": fields.Raw(attribute="evaluation", default=None), "created_from": fields.String, "created_by_role": fields.String, "created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True), diff --git a/api/migrations/versions/2026_03_03_0001-a1b2c3d4e5f6_add_evaluation_tables.py b/api/migrations/versions/2026_03_03_0001-a1b2c3d4e5f6_add_evaluation_tables.py index 9138256f36..986b4ca8bd 100644 --- a/api/migrations/versions/2026_03_03_0001-a1b2c3d4e5f6_add_evaluation_tables.py +++ b/api/migrations/versions/2026_03_03_0001-a1b2c3d4e5f6_add_evaluation_tables.py @@ -78,6 +78,7 @@ def upgrade(): "evaluation_run_items", sa.Column("id", models.types.StringUUID(), nullable=False), sa.Column("evaluation_run_id", models.types.StringUUID(), nullable=False), + sa.Column("workflow_run_id", models.types.StringUUID(), nullable=True), sa.Column("item_index", sa.Integer(), nullable=False), sa.Column("inputs", models.types.LongText(), nullable=True), sa.Column("expected_output", models.types.LongText(), nullable=True), @@ -95,10 +96,12 @@ def upgrade(): batch_op.create_index( "evaluation_run_item_index_idx", ["evaluation_run_id", "item_index"], unique=False ) + batch_op.create_index("evaluation_run_item_workflow_run_idx", ["workflow_run_id"], unique=False) def downgrade(): with op.batch_alter_table("evaluation_run_items", schema=None) as batch_op: + batch_op.drop_index("evaluation_run_item_workflow_run_idx") batch_op.drop_index("evaluation_run_item_index_idx") batch_op.drop_index("evaluation_run_item_run_idx") op.drop_table("evaluation_run_items") diff --git a/api/models/evaluation.py b/api/models/evaluation.py index 9e92df73a0..6932d6a346 100644 --- a/api/models/evaluation.py +++ b/api/models/evaluation.py @@ -28,6 +28,7 @@ class EvaluationTargetType(StrEnum): SNIPPETS = "snippets" KNOWLEDGE_BASE = "knowledge_base" + class EvaluationConfiguration(Base): """Stores evaluation configuration for each target (App or Snippet).""" @@ -132,10 +133,12 @@ class EvaluationRunItem(Base): sa.PrimaryKeyConstraint("id", name="evaluation_run_item_pkey"), sa.Index("evaluation_run_item_run_idx", "evaluation_run_id"), sa.Index("evaluation_run_item_index_idx", "evaluation_run_id", "item_index"), + sa.Index("evaluation_run_item_workflow_run_idx", "workflow_run_id"), ) id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7())) evaluation_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) item_index: Mapped[int] = mapped_column(Integer, nullable=False) inputs: Mapped[str | None] = mapped_column(LongText, nullable=True) diff --git a/api/services/evaluation_service.py b/api/services/evaluation_service.py index 87a4a72044..f7993f2333 100644 --- a/api/services/evaluation_service.py +++ b/api/services/evaluation_service.py @@ -571,7 +571,7 @@ class EvaluationService: target_id: str, input_list: list[EvaluationDatasetInput], max_workers: int = 5, - ) -> list[dict[str, NodeRunResult]]: + ) -> tuple[list[dict[str, NodeRunResult]], list[str | None]]: """Execute the evaluation target for every test-data item in parallel. :param tenant_id: Workspace / tenant ID. @@ -579,9 +579,11 @@ class EvaluationService: :param target_id: ID of the App or CustomizedSnippet. :param input_list: All test-data items parsed from the dataset. :param max_workers: Maximum number of parallel worker threads. - :return: Ordered list of ``{node_id: NodeRunResult}`` mappings. The - *i*-th element corresponds to ``input_list[i]``. If a target - execution fails, the corresponding element is an empty dict. + :return: Tuple of (node_results, workflow_run_ids). + node_results: ordered list of ``{node_id: NodeRunResult}`` mappings; + the *i*-th element corresponds to ``input_list[i]``. + workflow_run_ids: ordered list of workflow_run_id strings (or None) + for each input item. """ from concurrent.futures import ThreadPoolExecutor @@ -589,13 +591,12 @@ class EvaluationService: flask_app: Flask = current_app._get_current_object() # type: ignore - def _worker(item: EvaluationDatasetInput) -> dict[str, NodeRunResult]: + def _worker(item: EvaluationDatasetInput) -> tuple[dict[str, NodeRunResult], str | None]: with flask_app.app_context(): from models.engine import db with Session(db.engine, expire_on_commit=False) as thread_session: try: - # 1. Execute target (workflow app / snippet) response = cls._run_single_target( session=thread_session, target_type=target_type, @@ -603,7 +604,6 @@ class EvaluationService: item=item, ) - # 2. Extract workflow_run_id from the blocking response workflow_run_id = cls._extract_workflow_run_id(response) if not workflow_run_id: logger.warning( @@ -611,34 +611,38 @@ class EvaluationService: item.index, target_id, ) - return {} + return {}, None - # 3. Query per-node execution results from DB - return cls._query_node_run_results( + node_results = cls._query_node_run_results( session=thread_session, tenant_id=tenant_id, app_id=target_id, workflow_run_id=workflow_run_id, ) + return node_results, workflow_run_id except Exception: logger.exception( "Target execution failed for item %d (target=%s)", item.index, target_id, ) - return {} + return {}, None with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(_worker, item) for item in input_list] ordered_results: list[dict[str, NodeRunResult]] = [] + ordered_workflow_run_ids: list[str | None] = [] for future in futures: try: - ordered_results.append(future.result()) + node_result, wf_run_id = future.result() + ordered_results.append(node_result) + ordered_workflow_run_ids.append(wf_run_id) except Exception: logger.exception("Unexpected error collecting target execution result") ordered_results.append({}) + ordered_workflow_run_ids.append(None) - return ordered_results + return ordered_results, ordered_workflow_run_ids @classmethod def _run_single_target( diff --git a/api/services/workflow_app_service.py b/api/services/workflow_app_service.py index efc76c33bc..04eaeba9f3 100644 --- a/api/services/workflow_app_service.py +++ b/api/services/workflow_app_service.py @@ -19,17 +19,28 @@ class LogView: """Lightweight wrapper for WorkflowAppLog with computed details. - Exposes `details_` for marshalling to `details` in API response + - Exposes `evaluation_` for marshalling evaluation metrics in API response - Proxies all other attributes to the underlying `WorkflowAppLog` """ - def __init__(self, log: WorkflowAppLog, details: dict | None): + def __init__( + self, + log: WorkflowAppLog, + details: dict | None, + evaluation: list[dict] | None = None, + ): self.log = log self.details_ = details + self.evaluation_ = evaluation @property def details(self) -> dict | None: return self.details_ + @property + def evaluation(self) -> list[dict] | None: + return self.evaluation_ + def __getattr__(self, name): return getattr(self.log, name) @@ -159,12 +170,20 @@ class WorkflowAppService: # Execute query and get items if detail: rows = session.execute(offset_stmt).all() - items = [ - LogView(log, {"trigger_metadata": self.handle_trigger_metadata(app_model.tenant_id, meta_val)}) + logs_with_details = [ + (log, {"trigger_metadata": self.handle_trigger_metadata(app_model.tenant_id, meta_val)}) for log, meta_val in rows ] else: - items = [LogView(log, None) for log in session.scalars(offset_stmt).all()] + logs_with_details = [(log, None) for log in session.scalars(offset_stmt).all()] + + workflow_run_ids = [log.workflow_run_id for log, _ in logs_with_details] + eval_map = self._batch_query_evaluation_metrics(session, workflow_run_ids) + + items = [ + LogView(log, details, evaluation=eval_map.get(log.workflow_run_id)) + for log, details in logs_with_details + ] return { "page": page, "limit": limit, @@ -246,6 +265,45 @@ class WorkflowAppService: "data": items, } + @staticmethod + def _batch_query_evaluation_metrics( + session: Session, + workflow_run_ids: list[str], + ) -> dict[str, list[dict[str, Any]]]: + """Return evaluation metrics keyed by workflow_run_id. + + Only returns metrics from completed evaluation runs. If a workflow + run was not part of any evaluation (or the evaluation has not + completed), it will be absent from the result dict. + """ + from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus + + if not workflow_run_ids: + return {} + + non_null_ids = [wid for wid in workflow_run_ids if wid] + if not non_null_ids: + return {} + + stmt = ( + select(EvaluationRunItem.workflow_run_id, EvaluationRunItem.metrics) + .join(EvaluationRun, EvaluationRun.id == EvaluationRunItem.evaluation_run_id) + .where( + EvaluationRunItem.workflow_run_id.in_(non_null_ids), + EvaluationRun.status == EvaluationRunStatus.COMPLETED, + ) + ) + rows = session.execute(stmt).all() + + result: dict[str, list[dict[str, Any]]] = {} + for wf_run_id, metrics_json in rows: + if wf_run_id and metrics_json: + parsed: list[dict[str, Any]] = json.loads(metrics_json) + existing = result.get(wf_run_id, []) + existing.extend(parsed) + result[wf_run_id] = existing + return result + def handle_trigger_metadata(self, tenant_id: str, meta_val: str | None) -> dict[str, Any]: metadata: dict[str, Any] | None = self._safe_json_loads(meta_val) if not metadata: diff --git a/api/tasks/evaluation_task.py b/api/tasks/evaluation_task.py index 87d1a7ad9b..6e3aee5940 100644 --- a/api/tasks/evaluation_task.py +++ b/api/tasks/evaluation_task.py @@ -89,7 +89,7 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: ) else: evaluation_service = EvaluationService() - node_run_result_mapping_list: list[dict[str, NodeRunResult]] = evaluation_service.execute_targets( + node_run_result_mapping_list, workflow_run_ids = evaluation_service.execute_targets( tenant_id=run_data.tenant_id, target_type=run_data.target_type, target_id=run_data.target_id, @@ -102,6 +102,13 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: node_run_result_mapping_list=node_run_result_mapping_list, ) + _backfill_workflow_run_ids( + session=session, + evaluation_run_id=run_data.evaluation_run_id, + input_list=run_data.input_list, + workflow_run_ids=workflow_run_ids, + ) + # Compute summary metrics metrics_summary = _compute_metrics_summary(results, run_data.judgment_config) @@ -235,6 +242,28 @@ def _execute_retrieval_test( return results +def _backfill_workflow_run_ids( + session: Any, + evaluation_run_id: str, + input_list: list[EvaluationDatasetInput], + workflow_run_ids: list[str | None], +) -> None: + """Set ``workflow_run_id`` on items that were created by the runner.""" + from models.evaluation import EvaluationRunItem + + for item, wf_run_id in zip(input_list, workflow_run_ids): + if not wf_run_id: + continue + run_item = ( + session.query(EvaluationRunItem) + .filter_by(evaluation_run_id=evaluation_run_id, item_index=item.index) + .first() + ) + if run_item: + run_item.workflow_run_id = wf_run_id + session.commit() + + def _mark_run_failed(session: Any, run_id: str, error: str) -> None: """Mark an evaluation run as failed.""" try: