refactor: replace bare dict with dict[str, Any] in dataset and external_knowledge services (#35073)

This commit is contained in:
wdeveloper16
2026-04-13 19:08:45 +02:00
committed by GitHub
parent 57c5f0ec87
commit d8fbc00cb9
2 changed files with 15 additions and 13 deletions

View File

@@ -233,7 +233,7 @@ class DatasetService:
embedding_model_provider: str | None = None,
embedding_model_name: str | None = None,
retrieval_model: RetrievalModel | None = None,
summary_index_setting: dict | None = None,
summary_index_setting: dict[str, Any] | None = None,
):
# check if dataset name already exists
if db.session.scalar(select(Dataset).where(Dataset.name == name, Dataset.tenant_id == tenant_id).limit(1)):
@@ -2493,7 +2493,7 @@ class DocumentService:
data_source_type: str,
document_form: str,
document_language: str,
data_source_info: dict,
data_source_info: dict[str, Any],
created_from: str,
position: int,
account: Account,
@@ -2850,7 +2850,7 @@ class DocumentService:
raise ValueError("Process rule segmentation max_tokens is invalid")
@classmethod
def estimate_args_validate(cls, args: dict):
def estimate_args_validate(cls, args: dict[str, Any]):
if "info_list" not in args or not args["info_list"]:
raise ValueError("Data source info is required")
@@ -3132,7 +3132,7 @@ class DocumentService:
class SegmentService:
@classmethod
def segment_create_args_validate(cls, args: dict, document: Document):
def segment_create_args_validate(cls, args: dict[str, Any], document: Document):
if document.doc_form == IndexStructureType.QA_INDEX:
if "answer" not in args or not args["answer"]:
raise ValueError("Answer is required")
@@ -3149,7 +3149,7 @@ class SegmentService:
raise ValueError(f"Exceeded maximum attachment limit of {single_chunk_attachment_limit}")
@classmethod
def create_segment(cls, args: dict, document: Document, dataset: Dataset):
def create_segment(cls, args: dict[str, Any], document: Document, dataset: Dataset):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None

View File

@@ -47,7 +47,7 @@ class ExternalDatasetService:
return external_knowledge_apis.items, external_knowledge_apis.total
@classmethod
def validate_api_list(cls, api_settings: dict):
def validate_api_list(cls, api_settings: dict[str, Any]):
if not api_settings:
raise ValueError("api list is empty")
if not api_settings.get("endpoint"):
@@ -56,7 +56,7 @@ class ExternalDatasetService:
raise ValueError("api_key is required")
@staticmethod
def create_external_knowledge_api(tenant_id: str, user_id: str, args: dict) -> ExternalKnowledgeApis:
def create_external_knowledge_api(tenant_id: str, user_id: str, args: dict[str, Any]) -> ExternalKnowledgeApis:
settings = args.get("settings")
if settings is None:
raise ValueError("settings is required")
@@ -75,7 +75,7 @@ class ExternalDatasetService:
return external_knowledge_api
@staticmethod
def check_endpoint_and_api_key(settings: dict):
def check_endpoint_and_api_key(settings: dict[str, Any]):
if "endpoint" not in settings or not settings["endpoint"]:
raise ValueError("endpoint is required")
if "api_key" not in settings or not settings["api_key"]:
@@ -178,7 +178,9 @@ class ExternalDatasetService:
return external_knowledge_binding
@staticmethod
def document_create_args_validate(tenant_id: str, external_knowledge_api_id: str, process_parameter: dict):
def document_create_args_validate(
tenant_id: str, external_knowledge_api_id: str, process_parameter: dict[str, Any]
):
external_knowledge_api = db.session.scalar(
select(ExternalKnowledgeApis)
.where(ExternalKnowledgeApis.id == external_knowledge_api_id, ExternalKnowledgeApis.tenant_id == tenant_id)
@@ -222,7 +224,7 @@ class ExternalDatasetService:
return response
@staticmethod
def assembling_headers(authorization: Authorization, headers: dict | None = None) -> dict[str, Any]:
def assembling_headers(authorization: Authorization, headers: dict[str, Any] | None = None) -> dict[str, Any]:
authorization = deepcopy(authorization)
if headers:
headers = deepcopy(headers)
@@ -248,11 +250,11 @@ class ExternalDatasetService:
return headers
@staticmethod
def get_external_knowledge_api_settings(settings: dict) -> ExternalKnowledgeApiSetting:
def get_external_knowledge_api_settings(settings: dict[str, Any]) -> ExternalKnowledgeApiSetting:
return ExternalKnowledgeApiSetting.model_validate(settings)
@staticmethod
def create_external_dataset(tenant_id: str, user_id: str, args: dict) -> Dataset:
def create_external_dataset(tenant_id: str, user_id: str, args: dict[str, Any]) -> Dataset:
# check if dataset name already exists
if db.session.scalar(
select(Dataset).where(Dataset.name == args.get("name"), Dataset.tenant_id == tenant_id).limit(1)
@@ -304,7 +306,7 @@ class ExternalDatasetService:
tenant_id: str,
dataset_id: str,
query: str,
external_retrieval_parameters: dict,
external_retrieval_parameters: dict[str, Any],
metadata_condition: MetadataFilteringCondition | None = None,
):
external_knowledge_binding = db.session.scalar(