From ce0c2ea3bd1a20f34672dbb9fa9e42cd1dd461e1 Mon Sep 17 00:00:00 2001 From: FFXN Date: Thu, 5 Mar 2026 13:30:26 +0800 Subject: [PATCH] feat: Implement customized evaluation in BaseEvaluationInstance. --- .../evaluation/base_evaluation_instance.py | 178 ++++++++++- api/core/evaluation/entities/config_entity.py | 6 - .../evaluation/entities/judgment_entity.py | 96 +++++- api/core/evaluation/evaluation_manager.py | 8 - .../frameworks/customized/__init__.py | 0 .../customized/customized_evaluator.py | 267 ----------------- api/core/evaluation/judgment/processor.py | 278 ++++++++++++++++-- .../runners/base_evaluation_runner.py | 143 +++++++-- 8 files changed, 636 insertions(+), 340 deletions(-) delete mode 100644 api/core/evaluation/frameworks/customized/__init__.py delete mode 100644 api/core/evaluation/frameworks/customized/customized_evaluator.py diff --git a/api/core/evaluation/base_evaluation_instance.py b/api/core/evaluation/base_evaluation_instance.py index c5fc3f946e..4b714d2017 100644 --- a/api/core/evaluation/base_evaluation_instance.py +++ b/api/core/evaluation/base_evaluation_instance.py @@ -1,14 +1,21 @@ +import json +import logging from abc import ABC, abstractmethod +from collections.abc import Mapping +from typing import Any from core.evaluation.entities.evaluation_entity import ( EvaluationCategory, EvaluationItemInput, EvaluationItemResult, + EvaluationMetric, ) +logger = logging.getLogger(__name__) + class BaseEvaluationInstance(ABC): - """Abstract base class for evaluation framework adapters.""" + """Abstract base class for evaluation framework adapters. """ @abstractmethod def evaluate_llm( @@ -62,3 +69,172 @@ class BaseEvaluationInstance(ABC): def get_supported_metrics(self, category: EvaluationCategory) -> list[str]: """Return the list of supported metric names for a given evaluation category.""" ... + + def evaluate_with_customized_workflow( + self, + items: list[EvaluationItemInput], + results: list[EvaluationItemResult], + metrics_config: dict, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Evaluate using a published workflow as the evaluator. + + The evaluator workflow's output variables are treated as metrics: + each output variable name becomes a metric name, and its value + becomes the score. + + Args: + items: Evaluation items with inputs, expected_output, context. + results: Results from Phase 1 (with actual_output populated). + metrics_config: Must contain ``workflow_id`` pointing to a + published WORKFLOW-type App. + tenant_id: Tenant scope. + + Returns: + A list of ``EvaluationItemResult`` with metrics extracted from + the workflow outputs. + """ + from sqlalchemy.orm import Session + + from core.app.apps.workflow.app_generator import WorkflowAppGenerator + from core.app.entities.app_invoke_entities import InvokeFrom + from core.evaluation.runners import get_service_account_for_app + from models.engine import db + from models.model import App + from services.workflow_service import WorkflowService + + workflow_id = metrics_config.get("workflow_id") + if not workflow_id: + raise ValueError( + "metrics_config must contain 'workflow_id' for customized evaluator" + ) + + # Load the evaluator workflow resources using a dedicated session + with Session(db.engine, expire_on_commit=False) as session, session.begin(): + app = session.query(App).filter_by( + id=workflow_id, tenant_id=tenant_id + ).first() + if not app: + raise ValueError( + f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}" + ) + service_account = get_service_account_for_app(session, workflow_id) + + workflow_service = WorkflowService() + published_workflow = workflow_service.get_published_workflow(app_model=app) + if not published_workflow: + raise ValueError( + f"No published workflow found for evaluation app {workflow_id}" + ) + + result_by_index = {r.index: r for r in results} + eval_results: list[EvaluationItemResult] = [] + for item in items: + result = result_by_index.get(item.index) + try: + workflow_inputs = self._build_workflow_inputs(item, result) + + generator = WorkflowAppGenerator() + response: Mapping[str, Any] = generator.generate( + app_model=app, + workflow=published_workflow, + user=service_account, + args={"inputs": workflow_inputs}, + invoke_from=InvokeFrom.SERVICE_API, + streaming=False, + ) + + metrics = self._extract_workflow_metrics(response) + eval_results.append( + EvaluationItemResult( + index=item.index, + metrics=metrics, + metadata={ + "workflow_response": _safe_serialize(response), + }, + ) + ) + except Exception: + logger.exception( + "Customized evaluator failed for item %d with workflow %s", + item.index, + workflow_id, + ) + eval_results.append(EvaluationItemResult(index=item.index)) + + return eval_results + + @staticmethod + def _build_workflow_inputs( + item: EvaluationItemInput, + result: EvaluationItemResult | None, + ) -> dict[str, Any]: + """Build workflow input dict from evaluation data. + + Maps evaluation data to conventional workflow input variable names: + - ``actual_output``: The target's actual output (from ``result``). + - ``expected_output``: The expected/reference output. + - ``inputs``: The original evaluation inputs as a JSON string. + - ``context``: All context strings joined by newlines. + """ + workflow_inputs: dict[str, Any] = {} + + if result and result.actual_output: + workflow_inputs["actual_output"] = result.actual_output + + if item.expected_output: + workflow_inputs["expected_output"] = item.expected_output + + if item.inputs: + workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False) + + if item.context: + workflow_inputs["context"] = "\n\n".join(item.context) + + return workflow_inputs + + @staticmethod + def _extract_workflow_metrics( + response: Mapping[str, Any], + ) -> list[EvaluationMetric]: + """Extract evaluation metrics from workflow output variables. + + Each output variable is treated as a metric. The variable name + becomes the metric name, and its value becomes the score. + Non-numeric values are recorded with ``score=0.0`` and the raw + value stored in ``details``. + """ + metrics: list[EvaluationMetric] = [] + + data = response.get("data", {}) + if not isinstance(data, Mapping): + logger.warning("Unexpected workflow response format: missing 'data' dict") + return metrics + + outputs = data.get("outputs", {}) + if not isinstance(outputs, Mapping): + logger.warning( + "Unexpected workflow response format: 'outputs' is not a dict" + ) + return metrics + + for key, value in outputs.items(): + try: + score = float(value) + metrics.append(EvaluationMetric(name=key, score=score)) + except (TypeError, ValueError): + metrics.append( + EvaluationMetric( + name=key, score=0.0, details={"raw_value": value} + ) + ) + + return metrics + + +def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]: + """Safely serialize workflow response for metadata storage.""" + try: + return dict(response) + except Exception: + return {"raw": str(response)} diff --git a/api/core/evaluation/entities/config_entity.py b/api/core/evaluation/entities/config_entity.py index dc36f435b3..b1a48b894d 100644 --- a/api/core/evaluation/entities/config_entity.py +++ b/api/core/evaluation/entities/config_entity.py @@ -6,7 +6,6 @@ from pydantic import BaseModel class EvaluationFrameworkEnum(StrEnum): RAGAS = "ragas" DEEPEVAL = "deepeval" - CUSTOMIZED = "customized" NONE = "none" @@ -18,8 +17,3 @@ class BaseEvaluationConfig(BaseModel): class RagasConfig(BaseEvaluationConfig): """RAGAS-specific configuration.""" pass - - -class CustomizedEvaluatorConfig(BaseEvaluationConfig): - """Configuration for the customized workflow-based evaluator.""" - pass diff --git a/api/core/evaluation/entities/judgment_entity.py b/api/core/evaluation/entities/judgment_entity.py index 4970938c78..58e5fe91e3 100644 --- a/api/core/evaluation/entities/judgment_entity.py +++ b/api/core/evaluation/entities/judgment_entity.py @@ -1,40 +1,112 @@ """Judgment condition entities for evaluation metric assessment. +Key concepts: + - **value_source**: Where the comparison target comes from. + - "constant": a literal value supplied by the user (e.g. threshold "0.8"). + - "variable": a named field from the evaluation target's runtime data + (inputs, actual_output, expected_output). The ``value`` field holds the + variable key; the actual comparison value is resolved at evaluation time. + - **condition_type**: Determines operator semantics and type coercion. + - "string": string operators (contains, is, start with, …). + - "number": numeric operators (>, <, =, ≠, ≥, ≤). + - "datetime": temporal operators (before, after). + Typical usage: judgment_config = JudgmentConfig( logical_operator="and", conditions=[ - JudgmentCondition(metric_name="faithfulness", comparison_operator=">", value="0.8"), - JudgmentCondition(metric_name="answer_relevancy", comparison_operator="≥", value="0.7"), + JudgmentCondition( + metric_name="faithfulness", + comparison_operator=">", + value="0.8", + condition_type="number", + ), + JudgmentCondition( + metric_name="output", + comparison_operator="contains", + value="expected_output", + value_source="variable", + condition_type="string", + ), ], ) """ from collections.abc import Sequence +from enum import StrEnum from typing import Any, Literal from pydantic import BaseModel, Field -from core.workflow.utils.condition.entities import SupportedComparisonOperator + +class JudgmentValueSource(StrEnum): + """Where the comparison target value comes from.""" + + CONSTANT = "constant" + VARIABLE = "variable" + + +class JudgmentConditionType(StrEnum): + """Category of the condition, controls operator semantics and type coercion.""" + + STRING = "string" + NUMBER = "number" + DATETIME = "datetime" + + +# Supported comparison operators for judgment conditions. +JudgmentComparisonOperator = Literal[ + # string + "contains", + "not contains", + "start with", + "end with", + "is", + "is not", + "empty", + "not empty", + "in", + "not in", + # number + "=", + "≠", + ">", + "<", + "≥", + "≤", + # datetime + "before", + "after", + # universal + "null", + "not null", +] class JudgmentCondition(BaseModel): """A single judgment condition that checks one metric value. Attributes: - metric_name: The name of the evaluation metric to check - (must match an EvaluationMetric.name in the results). - comparison_operator: The comparison operator to apply - (reuses the same operator set as workflow condition branches). - value: The expected/threshold value to compare against. - For numeric operators (>, <, =, etc.), this should be a numeric string. - For string operators (contains, is, etc.), this should be a string. + metric_name: The name of the evaluation metric to check (left side). + Must match an EvaluationMetric.name in the results. + comparison_operator: The comparison operator to apply. + value: The comparison target (right side). + - When value_source is "constant": the literal threshold/expected value. + - When value_source is "variable": the variable key name to look up + from the evaluation target's runtime data. For unary operators (empty, null, etc.), this can be None. + value_source: Where the comparison value comes from. + "constant" (default) for user-supplied literals, + "variable" for references to evaluation target data. + condition_type: Controls type coercion and which operators are valid. + "string" (default), "number", or "datetime". """ metric_name: str - comparison_operator: SupportedComparisonOperator + comparison_operator: JudgmentComparisonOperator value: str | Sequence[str] | None = None + value_source: JudgmentValueSource = JudgmentValueSource.CONSTANT + condition_type: JudgmentConditionType = JudgmentConditionType.STRING class JudgmentConfig(BaseModel): @@ -56,7 +128,7 @@ class JudgmentConditionResult(BaseModel): Attributes: metric_name: Which metric was checked. comparison_operator: The operator that was applied. - expected_value: The threshold/expected value from the condition config. + expected_value: The resolved comparison value (after variable resolution). actual_value: The actual metric value that was evaluated. passed: Whether this individual condition passed. error: Error message if the condition evaluation failed. diff --git a/api/core/evaluation/evaluation_manager.py b/api/core/evaluation/evaluation_manager.py index 06a3bed221..5499b96cad 100644 --- a/api/core/evaluation/evaluation_manager.py +++ b/api/core/evaluation/evaluation_manager.py @@ -25,14 +25,6 @@ class EvaluationFrameworkConfigMap(collections.UserDict[str, dict[str, Any]]): } case EvaluationFrameworkEnum.DEEPEVAL: raise NotImplementedError("DeepEval adapter is not yet implemented.") - case EvaluationFrameworkEnum.CUSTOMIZED: - from core.evaluation.entities.config_entity import CustomizedEvaluatorConfig - from core.evaluation.frameworks.customized.customized_evaluator import CustomizedEvaluator - - return { - "config_class": CustomizedEvaluatorConfig, - "evaluator_class": CustomizedEvaluator, - } case _: raise ValueError(f"Unknown evaluation framework: {framework}") diff --git a/api/core/evaluation/frameworks/customized/__init__.py b/api/core/evaluation/frameworks/customized/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/api/core/evaluation/frameworks/customized/customized_evaluator.py b/api/core/evaluation/frameworks/customized/customized_evaluator.py deleted file mode 100644 index 501af17833..0000000000 --- a/api/core/evaluation/frameworks/customized/customized_evaluator.py +++ /dev/null @@ -1,267 +0,0 @@ -"""Customized workflow-based evaluator. - -Uses a published workflow as the evaluation strategy. The target's actual output, -expected output, original inputs, and context are passed as workflow inputs. -The workflow's output variables are treated as evaluation metrics. - -The evaluation workflow_id is provided per evaluation run via -metrics_config["workflow_id"]. - -""" - -import json -import logging -from collections.abc import Mapping -from typing import Any - -from core.evaluation.base_evaluation_instance import BaseEvaluationInstance -from core.evaluation.entities.config_entity import CustomizedEvaluatorConfig -from core.evaluation.entities.evaluation_entity import ( - EvaluationCategory, - EvaluationItemInput, - EvaluationItemResult, - EvaluationMetric, -) - -logger = logging.getLogger(__name__) - - -class CustomizedEvaluator(BaseEvaluationInstance): - """Evaluate using a published workflow.""" - - def __init__(self, config: CustomizedEvaluatorConfig): - self.config = config - - def evaluate_llm( - self, - items: list[EvaluationItemInput], - metrics_config: dict, - model_provider: str, - model_name: str, - tenant_id: str, - ) -> list[EvaluationItemResult]: - return self._evaluate_with_workflow(items, metrics_config, tenant_id) - - def evaluate_retrieval( - self, - items: list[EvaluationItemInput], - metrics_config: dict, - model_provider: str, - model_name: str, - tenant_id: str, - ) -> list[EvaluationItemResult]: - return self._evaluate_with_workflow(items, metrics_config, tenant_id) - - def evaluate_agent( - self, - items: list[EvaluationItemInput], - metrics_config: dict, - model_provider: str, - model_name: str, - tenant_id: str, - ) -> list[EvaluationItemResult]: - return self._evaluate_with_workflow(items, metrics_config, tenant_id) - - def evaluate_workflow( - self, - items: list[EvaluationItemInput], - metrics_config: dict, - model_provider: str, - model_name: str, - tenant_id: str, - ) -> list[EvaluationItemResult]: - return self._evaluate_with_workflow(items, metrics_config, tenant_id) - - def get_supported_metrics(self, category: EvaluationCategory) -> list[str]: - """Metrics are dynamic and defined by the evaluation workflow outputs. - - Return an empty list since available metrics depend on the specific - workflow chosen at runtime. - """ - return [] - - def _evaluate_with_workflow( - self, - items: list[EvaluationItemInput], - metrics_config: dict, - tenant_id: str, - ) -> list[EvaluationItemResult]: - """Run the evaluation workflow for each item and extract metric scores. - - Args: - items: Evaluation items with inputs, expected_output, and context - (context typically contains the target's actual_output, merged - by the Runner's evaluate_metrics method). - metrics_config: Must contain "workflow_id" pointing to a published - WORKFLOW-type App. - tenant_id: Tenant scope for database and workflow execution. - - Returns: - List of EvaluationItemResult with metrics extracted from workflow outputs. - - Raises: - ValueError: If workflow_id is missing from metrics_config or the - workflow/app cannot be found. - """ - workflow_id = metrics_config.get("workflow_id") - if not workflow_id: - raise ValueError( - "metrics_config must contain 'workflow_id' for customized evaluator" - ) - - app, workflow, service_account = self._load_workflow_resources(workflow_id, tenant_id) - - results: list[EvaluationItemResult] = [] - for item in items: - try: - result = self._evaluate_single_item(app, workflow, service_account, item) - results.append(result) - except Exception: - logger.exception( - "Customized evaluator failed for item %d with workflow %s", - item.index, - workflow_id, - ) - results.append(EvaluationItemResult(index=item.index)) - return results - - def _evaluate_single_item( - self, - app: Any, - workflow: Any, - service_account: Any, - item: EvaluationItemInput, - ) -> EvaluationItemResult: - """Run the evaluation workflow for a single item. - - Builds workflow inputs from the item data and executes the workflow - in non-streaming mode. Extracts metrics from the workflow's output - variables. - """ - from core.app.apps.workflow.app_generator import WorkflowAppGenerator - from core.app.entities.app_invoke_entities import InvokeFrom - - workflow_inputs = self._build_workflow_inputs(item) - - generator = WorkflowAppGenerator() - response: Mapping[str, Any] = generator.generate( - app_model=app, - workflow=workflow, - user=service_account, - args={"inputs": workflow_inputs}, - invoke_from=InvokeFrom.SERVICE_API, - streaming=False, - ) - - metrics = self._extract_metrics(response) - return EvaluationItemResult( - index=item.index, - metrics=metrics, - metadata={"workflow_response": self._safe_serialize(response)}, - ) - - def _load_workflow_resources( - self, workflow_id: str, tenant_id: str - ) -> tuple[Any, Any, Any]: - """Load the evaluation workflow App, its published workflow, and a service account. - - Args: - workflow_id: The App ID of the evaluation workflow. - tenant_id: Tenant scope. - - Returns: - Tuple of (app, workflow, service_account). - - Raises: - ValueError: If the app or published workflow cannot be found. - """ - from sqlalchemy.orm import Session - - from core.evaluation.runners import get_service_account_for_app - from models.engine import db - from models.model import App - from services.workflow_service import WorkflowService - - with Session(db.engine, expire_on_commit=False) as session, session.begin(): - app = session.query(App).filter_by(id=workflow_id, tenant_id=tenant_id).first() - if not app: - raise ValueError( - f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}" - ) - - service_account = get_service_account_for_app(session, workflow_id) - - workflow_service = WorkflowService() - published_workflow = workflow_service.get_published_workflow(app_model=app) - if not published_workflow: - raise ValueError( - f"No published workflow found for evaluation app {workflow_id}" - ) - - return app, published_workflow, service_account - - @staticmethod - def _build_workflow_inputs(item: EvaluationItemInput) -> dict[str, Any]: - """Build workflow input dict from an evaluation item. - - Maps evaluation data to conventional workflow input variable names: - - actual_output: The target's actual output (from context[0] if available) - - expected_output: The expected/reference output - - inputs: The original evaluation inputs as JSON string - - context: All context strings joined by newlines - - """ - workflow_inputs: dict[str, Any] = {} - - # The actual_output is typically the first element in context - # (merged by the Runner's evaluate_metrics method) - if item.context: - workflow_inputs["actual_output"] = item.context[0] if len(item.context) == 1 else "\n\n".join(item.context) - - if item.expected_output: - workflow_inputs["expected_output"] = item.expected_output - - if item.inputs: - workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False) - - if item.context and len(item.context) > 1: - workflow_inputs["context"] = "\n\n".join(item.context) - - return workflow_inputs - - @staticmethod - def _extract_metrics(response: Mapping[str, Any]) -> list[EvaluationMetric]: - """Extract evaluation metrics from workflow output variables. - - Each output variable is treated as a metric. - """ - metrics: list[EvaluationMetric] = [] - - data = response.get("data", {}) - if not isinstance(data, Mapping): - logger.warning("Unexpected workflow response format: missing 'data' dict") - return metrics - - outputs = data.get("outputs", {}) - if not isinstance(outputs, Mapping): - logger.warning("Unexpected workflow response format: 'outputs' is not a dict") - return metrics - - for key, value in outputs.items(): - try: - score = float(value) - metrics.append(EvaluationMetric(name=key, score=score)) - except (TypeError, ValueError): - metrics.append( - EvaluationMetric(name=key, score=0.0, details={"raw_value": value}) - ) - - return metrics - - @staticmethod - def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]: - """Safely serialize workflow response for metadata storage.""" - try: - return dict(response) - except Exception: - return {"raw": str(response)} diff --git a/api/core/evaluation/judgment/processor.py b/api/core/evaluation/judgment/processor.py index 0345110ca1..d28c1e10f0 100644 --- a/api/core/evaluation/judgment/processor.py +++ b/api/core/evaluation/judgment/processor.py @@ -5,21 +5,47 @@ Reuses the core comparison engine from the workflow condition system (core.workflow.utils.condition.processor._evaluate_condition) to ensure consistent operator semantics across the platform. +The processor is intentionally decoupled from evaluation frameworks +(RAGAS / Customized) and runners. It operates on plain ``dict`` mappings +and can be invoked from any context. + +Typical usage:: + + metrics = {"faithfulness": 0.85, "answer_relevancy": 0.6} + variables = {"expected_output": "Hello World", "created_at": "2025-01-01T00:00:00"} + config = JudgmentConfig( + logical_operator="and", + conditions=[ + JudgmentCondition(metric_name="faithfulness", comparison_operator=">", + value="0.8", condition_type="number"), + JudgmentCondition(metric_name="output", comparison_operator="contains", + value="expected_output", value_source="variable", + condition_type="string"), + ], + ) + result = JudgmentProcessor.evaluate(metrics, config, variable_values=variables) """ import logging +from collections.abc import Sequence +from datetime import datetime from typing import Any from core.evaluation.entities.judgment_entity import ( JudgmentCondition, JudgmentConditionResult, + JudgmentConditionType, JudgmentConfig, JudgmentResult, + JudgmentValueSource, ) from core.workflow.utils.condition.processor import _evaluate_condition logger = logging.getLogger(__name__) +# Operators that do not need a comparison value (unary operators). +_UNARY_OPERATORS = frozenset({"null", "not null", "empty", "not empty"}) + class JudgmentProcessor: @@ -27,13 +53,17 @@ class JudgmentProcessor: def evaluate( metric_values: dict[str, Any], config: JudgmentConfig, + variable_values: dict[str, Any] | None = None, ) -> JudgmentResult: """Evaluate all judgment conditions against the given metric values. Args: - metric_values: Mapping of metric name to its value - (e.g. {"faithfulness": 0.85, "status": "success"}). + metric_values: Mapping of metric name → metric value + (e.g. ``{"faithfulness": 0.85, "status": "success"}``). config: The judgment configuration with logical_operator and conditions. + variable_values: Optional mapping of variable name → value, used when + a condition's ``value_source`` is ``"variable"``. Typically built + from the evaluation target's inputs / outputs. Returns: JudgmentResult with overall pass/fail and per-condition details. @@ -49,7 +79,7 @@ class JudgmentProcessor: for condition in config.conditions: result = JudgmentProcessor._evaluate_single_condition( - metric_values, condition + metric_values, condition, variable_values ) condition_results.append(result) @@ -66,6 +96,7 @@ class JudgmentProcessor: condition_results=condition_results, ) + # All conditions evaluated if config.logical_operator == "and": final_passed = all(r.passed for r in condition_results) else: @@ -81,31 +112,21 @@ class JudgmentProcessor: def _evaluate_single_condition( metric_values: dict[str, Any], condition: JudgmentCondition, + variable_values: dict[str, Any] | None = None, ) -> JudgmentConditionResult: - """Evaluate a single judgment condition against the metric values. + """Evaluate a single judgment condition. - Looks up the metric by name, then delegates to the workflow condition - engine for the actual comparison. - - Args: - metric_values: Mapping of metric name to its value. - condition: The condition to evaluate. - - Returns: - JudgmentConditionResult with pass/fail and details. + Steps: + 1. Look up the metric value (left side) by ``metric_name``. + 2. Resolve the comparison value (right side) — either a constant + or a variable reference. + 3. Dispatch to the correct type handler (string / number / datetime). """ metric_name = condition.metric_name actual_value = metric_values.get(metric_name) - # Handle metric not found - if actual_value is None and condition.comparison_operator not in ( - "null", - "not null", - "empty", - "not empty", - "exists", - "not exists", - ): + # Handle metric not found — skip for unary operators that work on None + if actual_value is None and condition.comparison_operator not in _UNARY_OPERATORS: return JudgmentConditionResult( metric_name=metric_name, comparison_operator=condition.comparison_operator, @@ -115,17 +136,44 @@ class JudgmentProcessor: error=f"Metric '{metric_name}' not found in evaluation results", ) + # Resolve the comparison value (right side) try: - passed = _evaluate_condition( - operator=condition.comparison_operator, - value=actual_value, - expected=condition.value, + resolved_value = JudgmentProcessor._resolve_comparison_value( + condition, variable_values ) + except ValueError as e: return JudgmentConditionResult( metric_name=metric_name, comparison_operator=condition.comparison_operator, expected_value=condition.value, actual_value=actual_value, + passed=False, + error=str(e), + ) + + # Dispatch to the appropriate type handler + try: + match condition.condition_type: + case JudgmentConditionType.DATETIME: + passed = _evaluate_datetime_condition( + actual_value, condition.comparison_operator, resolved_value + ) + case JudgmentConditionType.NUMBER: + passed = _evaluate_number_condition( + actual_value, condition.comparison_operator, resolved_value + ) + case _: # STRING (default) — delegate to workflow engine + passed = _evaluate_condition( + operator=condition.comparison_operator, + value=actual_value, + expected=resolved_value, + ) + + return JudgmentConditionResult( + metric_name=metric_name, + comparison_operator=condition.comparison_operator, + expected_value=resolved_value, + actual_value=actual_value, passed=passed, ) except Exception as e: @@ -137,8 +185,184 @@ class JudgmentProcessor: return JudgmentConditionResult( metric_name=metric_name, comparison_operator=condition.comparison_operator, - expected_value=condition.value, + expected_value=resolved_value, actual_value=actual_value, passed=False, error=str(e), ) + + @staticmethod + def _resolve_comparison_value( + condition: JudgmentCondition, + variable_values: dict[str, Any] | None, + ) -> str | Sequence[str] | None: + """Resolve the right-side comparison value. + + For ``value_source == "constant"``, returns ``condition.value`` as-is. + For ``value_source == "variable"``, looks up ``condition.value`` (as a key) + in ``variable_values`` and returns the resolved value (converted to string + for compatibility with the comparison engine). + + Raises: + ValueError: If the variable cannot be resolved. + """ + if condition.value_source == JudgmentValueSource.CONSTANT: + return condition.value + + # Variable resolution + if condition.value is None: + raise ValueError("Variable name (value) must be provided when value_source is 'variable'") + + if not variable_values: + raise ValueError( + f"Cannot resolve variable '{condition.value}': no variable values provided" + ) + + var_key = condition.value if isinstance(condition.value, str) else str(condition.value) + if var_key not in variable_values: + raise ValueError( + f"Variable '{var_key}' not found in evaluation target data. " + f"Available variables: {list(variable_values.keys())}" + ) + + resolved = variable_values[var_key] + # Convert to string for the comparison engine, unless it's already + # a str/Sequence[str]/None which the engine expects. + if resolved is None: + return None + if isinstance(resolved, str): + return resolved + if isinstance(resolved, Sequence) and all(isinstance(v, str) for v in resolved): + return resolved + return str(resolved) + + +_DATETIME_FORMATS = [ + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d", +] + + +def _parse_datetime(value: object) -> datetime: + """Parse a value into a datetime object. + + Accepts datetime instances, numeric timestamps (int/float), and common + ISO 8601 string formats. + + Raises: + ValueError: If the value cannot be parsed as a datetime. + """ + if isinstance(value, datetime): + return value + if isinstance(value, (int, float)): + return datetime.fromtimestamp(value) + if not isinstance(value, str): + raise ValueError(f"Cannot parse '{value}' (type={type(value).__name__}) as datetime") + + for fmt in _DATETIME_FORMATS: + try: + return datetime.strptime(value, fmt) + except ValueError: + continue + + raise ValueError( + f"Cannot parse datetime string '{value}'. " + f"Supported formats: ISO 8601, 'YYYY-MM-DD HH:MM:SS', 'YYYY-MM-DD', or numeric timestamp." + ) + + +def _evaluate_datetime_condition( + actual: object, + operator: str, + expected: object, +) -> bool: + """Evaluate a datetime comparison condition. + + Also supports the universal unary operators (null, not null, empty, not empty) + and the numeric-style operators (=, ≠, >, <, ≥, ≤) for datetime values. + + Args: + actual: The actual metric value (left side). + operator: The comparison operator. + expected: The expected/threshold value (right side). + + Returns: + True if the condition passes. + + Raises: + ValueError: If values cannot be parsed or operator is unsupported. + """ + # Handle unary operators first + if operator == "null": + return actual is None + if operator == "not null": + return actual is not None + if operator == "empty": + return not actual + if operator == "not empty": + return bool(actual) + + if actual is None: + return False + + actual_dt = _parse_datetime(actual) + expected_dt = _parse_datetime(expected) if expected is not None else None + + if expected_dt is None: + raise ValueError(f"Expected datetime value is required for operator '{operator}'") + + match operator: + case "before" | "<": + return actual_dt < expected_dt + case "after" | ">": + return actual_dt > expected_dt + case "=" | "is": + return actual_dt == expected_dt + case "≠" | "is not": + return actual_dt != expected_dt + case "≥": + return actual_dt >= expected_dt + case "≤": + return actual_dt <= expected_dt + case _: + raise ValueError(f"Unsupported datetime operator: '{operator}'") + + +def _evaluate_number_condition( + actual: object, + operator: str, + expected: object, +) -> bool: + """Evaluate a numeric comparison condition. + + Ensures proper numeric type coercion before delegating to the workflow + condition engine. This avoids string-vs-number comparison pitfalls + (e.g. comparing float metric 0.85 against string threshold "0.8"). + + For unary operators (null, not null, empty, not empty), delegates directly. + """ + # Unary operators — delegate to workflow engine as-is + if operator in _UNARY_OPERATORS: + return _evaluate_condition(operator=operator, value=actual, expected=expected) + + if actual is None: + return False + + # Coerce actual to numeric + if not isinstance(actual, (int, float)): + try: + actual = float(actual) + except (TypeError, ValueError) as e: + raise ValueError(f"Cannot convert actual value '{actual}' to number") from e + + # Coerce expected to numeric string for the workflow engine + # (the workflow engine's _normalize_numeric_values handles str → float) + if expected is not None and not isinstance(expected, str): + expected = str(expected) + + return _evaluate_condition(operator=operator, value=actual, expected=expected) diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py index d5d93a9808..934ef7eeb9 100644 --- a/api/core/evaluation/runners/base_evaluation_runner.py +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -1,6 +1,17 @@ +"""Base evaluation runner. + +Orchestrates the evaluation lifecycle in four phases: + 1. execute_target — run the target and collect actual outputs (abstract) + 2. evaluate_metrics — compute metrics via framework or customized workflow + 3. apply_judgment — evaluate pass/fail judgment conditions on metrics + 4. persist — save results to the database + +""" + import json import logging from abc import ABC, abstractmethod +from typing import Any from sqlalchemy.orm import Session @@ -21,14 +32,16 @@ class BaseEvaluationRunner(ABC): """Abstract base class for evaluation runners. Runners are responsible for executing the target (App/Snippet/Retrieval) - to collect actual outputs, then delegating to the evaluation instance - for metric computation, and optionally applying judgment conditions. + to collect actual outputs, then computing evaluation metrics, optionally + applying judgment conditions, and persisting results. - Execution phases: - 1. execute_target — run the target and collect actual outputs - 2. evaluate_metrics — compute evaluation metrics via the framework - 3. apply_judgment — evaluate pass/fail judgment conditions on metrics - 4. persist — save results to the database + Built-in capabilities (implemented in this base class): + - Customized workflow dispatch (``_evaluate_customized``) + - Judgment condition evaluation (``_apply_judgment``) + + Subclass responsibilities: + - ``execute_target`` — target-specific execution logic + - ``evaluate_metrics`` — framework-specific metric computation (RAGAS etc.) """ def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): @@ -56,7 +69,18 @@ class BaseEvaluationRunner(ABC): model_name: str, tenant_id: str, ) -> list[EvaluationItemResult]: - """Compute evaluation metrics on the collected results.""" + """Compute evaluation metrics on the collected results. + + Called only when the evaluation is NOT using a customized workflow + (i.e. ``metrics_config`` does not contain ``workflow_id``). + + Implementations should: + 1. Merge ``actual_output`` from ``results`` into the ``context`` + field of each ``EvaluationItemInput``. + 2. Call ``self.evaluation_instance.evaluate_xxx()`` with the + merged items. + 3. Return updated results with metrics populated. + """ ... def run( @@ -71,9 +95,7 @@ class BaseEvaluationRunner(ABC): model_name: str, judgment_config: JudgmentConfig | None = None, ) -> list[EvaluationItemResult]: - """Orchestrate target execution + metric evaluation + judgment for all items. - - """ + """Orchestrate target execution + metric evaluation + judgment for all items.""" evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first() if not evaluation_run: raise ValueError(f"EvaluationRun {evaluation_run_id} not found") @@ -108,9 +130,18 @@ class BaseEvaluationRunner(ABC): if successful_items and successful_results: try: - evaluated_results = self.evaluate_metrics( - successful_items, successful_results, metrics_config, model_provider, model_name, tenant_id - ) + if _is_customized_evaluation(metrics_config): + # Customized workflow evaluation — target-type agnostic, + # handled via BaseEvaluationInstance.evaluate_with_customized_workflow(). + evaluated_results = self._evaluate_customized( + successful_items, successful_results, metrics_config, tenant_id, + ) + else: + # Framework-specific evaluation — delegate to subclass + evaluated_results = self.evaluate_metrics( + successful_items, successful_results, metrics_config, + model_provider, model_name, tenant_id, + ) # Merge evaluated metrics back into results evaluated_by_index = {r.index: r for r in evaluated_results} for i, result in enumerate(results): @@ -121,7 +152,7 @@ class BaseEvaluationRunner(ABC): # Phase 3: Apply judgment conditions on metrics if judgment_config and judgment_config.conditions: - results = self._apply_judgment(results, judgment_config) + results = self._apply_judgment(results, items, judgment_config) # Phase 4: Persist individual items for result in results: @@ -145,27 +176,101 @@ class BaseEvaluationRunner(ABC): return results + # ------------------------------------------------------------------ + # Customized workflow evaluation dispatch + # ------------------------------------------------------------------ + + def _evaluate_customized( + self, + items: list[EvaluationItemInput], + results: list[EvaluationItemResult], + metrics_config: dict, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Delegate to the instance's customized workflow evaluator. + + Unlike the framework path (which merges ``actual_output`` into + ``context``), here we pass ``results`` directly — the instance's + ``evaluate_with_customized_workflow()`` reads ``actual_output`` + from each ``EvaluationItemResult``. + """ + evaluated = self.evaluation_instance.evaluate_with_customized_workflow( + items, results, metrics_config, tenant_id, + ) + + # Merge metrics back preserving actual_output and metadata from Phase 1 + eval_by_index = {r.index: r for r in evaluated} + final_results: list[EvaluationItemResult] = [] + for result in results: + if result.index in eval_by_index: + eval_result = eval_by_index[result.index] + final_results.append( + EvaluationItemResult( + index=result.index, + actual_output=result.actual_output, + metrics=eval_result.metrics, + metadata={**result.metadata, **eval_result.metadata}, + error=eval_result.error, + ) + ) + else: + final_results.append(result) + return final_results + + # ------------------------------------------------------------------ + # Judgment (target-type agnostic) + # ------------------------------------------------------------------ + @staticmethod def _apply_judgment( results: list[EvaluationItemResult], + items: list[EvaluationItemInput], judgment_config: JudgmentConfig, ) -> list[EvaluationItemResult]: """Apply judgment conditions to each result's metrics. - Builds a metric_name → score mapping from each result's metrics, - then delegates to JudgmentProcessor for condition evaluation. + Builds a metric_name → value mapping from each result's metrics, + and a variable_values dict from the evaluation target's runtime data + (inputs, actual_output, expected_output) for variable-type conditions. Results with errors are skipped. """ + items_by_index = {item.index: item for item in items} judged_results: list[EvaluationItemResult] = [] + for result in results: if result.error is not None or not result.metrics: judged_results.append(result) continue - metric_values = {m.name: m.score for m in result.metrics} - judgment_result = JudgmentProcessor.evaluate(metric_values, judgment_config) + metric_values: dict[str, object] = {m.name: m.score for m in result.metrics} + + # Build variable pool from the evaluation target's runtime data. + # These variables can be referenced in conditions with value_source="variable". + item_input = items_by_index.get(result.index) + variable_values: dict[str, object] = {} + if item_input: + variable_values.update(item_input.inputs) + if item_input.expected_output is not None: + variable_values["expected_output"] = item_input.expected_output + if item_input.context: + variable_values["context"] = "; ".join(item_input.context) + if result.actual_output is not None: + variable_values["actual_output"] = result.actual_output + + judgment_result = JudgmentProcessor.evaluate( + metric_values, judgment_config, variable_values=variable_values + ) judged_results.append( result.model_copy(update={"judgment": judgment_result}) ) return judged_results + + +def _is_customized_evaluation(metrics_config: dict[str, Any]) -> bool: + """Check if metrics_config indicates a customized workflow evaluation. + + The convention is that ``metrics_config["workflow_id"]`` is present + when a user-defined workflow should be used for evaluation. + """ + return bool(metrics_config.get("workflow_id"))