diff --git a/api/core/extension/api_based_extension_requestor.py b/api/core/extension/api_based_extension_requestor.py index fab9ae44e9..f9e6099049 100644 --- a/api/core/extension/api_based_extension_requestor.py +++ b/api/core/extension/api_based_extension_requestor.py @@ -1,13 +1,13 @@ from typing import cast -import requests +import httpx from configs import dify_config from models.api_based_extension import APIBasedExtensionPoint class APIBasedExtensionRequestor: - timeout: tuple[int, int] = (5, 60) + timeout: httpx.Timeout = httpx.Timeout(60.0, connect=5.0) """timeout for request connect and read""" def __init__(self, api_endpoint: str, api_key: str): @@ -27,25 +27,23 @@ class APIBasedExtensionRequestor: url = self.api_endpoint try: - # proxy support for security - proxies = None + mounts: dict[str, httpx.BaseTransport] | None = None if dify_config.SSRF_PROXY_HTTP_URL and dify_config.SSRF_PROXY_HTTPS_URL: - proxies = { - "http": dify_config.SSRF_PROXY_HTTP_URL, - "https": dify_config.SSRF_PROXY_HTTPS_URL, + mounts = { + "http://": httpx.HTTPTransport(proxy=dify_config.SSRF_PROXY_HTTP_URL), + "https://": httpx.HTTPTransport(proxy=dify_config.SSRF_PROXY_HTTPS_URL), } - response = requests.request( - method="POST", - url=url, - json={"point": point.value, "params": params}, - headers=headers, - timeout=self.timeout, - proxies=proxies, - ) - except requests.Timeout: + with httpx.Client(mounts=mounts, timeout=self.timeout) as client: + response = client.request( + method="POST", + url=url, + json={"point": point.value, "params": params}, + headers=headers, + ) + except httpx.TimeoutException: raise ValueError("request timeout") - except requests.ConnectionError: + except httpx.RequestError: raise ValueError("request connection error") if response.status_code != 200: diff --git a/api/core/plugin/impl/base.py b/api/core/plugin/impl/base.py index 62a5cc535a..c791b35161 100644 --- a/api/core/plugin/impl/base.py +++ b/api/core/plugin/impl/base.py @@ -2,11 +2,10 @@ import inspect import json import logging from collections.abc import Callable, Generator -from typing import TypeVar +from typing import Any, TypeVar -import requests +import httpx from pydantic import BaseModel -from requests.exceptions import HTTPError from yarl import URL from configs import dify_config @@ -47,29 +46,56 @@ class BasePluginClient: data: bytes | dict | str | None = None, params: dict | None = None, files: dict | None = None, - stream: bool = False, - ) -> requests.Response: + ) -> httpx.Response: """ Make a request to the plugin daemon inner API. """ - url = plugin_daemon_inner_api_baseurl / path - headers = headers or {} - headers["X-Api-Key"] = dify_config.PLUGIN_DAEMON_KEY - headers["Accept-Encoding"] = "gzip, deflate, br" + url, headers, prepared_data, params, files = self._prepare_request(path, headers, data, params, files) - if headers.get("Content-Type") == "application/json" and isinstance(data, dict): - data = json.dumps(data) + request_kwargs: dict[str, Any] = { + "method": method, + "url": url, + "headers": headers, + "params": params, + "files": files, + } + if isinstance(prepared_data, dict): + request_kwargs["data"] = prepared_data + elif prepared_data is not None: + request_kwargs["content"] = prepared_data try: - response = requests.request( - method=method, url=str(url), headers=headers, data=data, params=params, stream=stream, files=files - ) - except requests.ConnectionError: + response = httpx.request(**request_kwargs) + except httpx.RequestError: logger.exception("Request to Plugin Daemon Service failed") raise PluginDaemonInnerError(code=-500, message="Request to Plugin Daemon Service failed") return response + def _prepare_request( + self, + path: str, + headers: dict | None, + data: bytes | dict | str | None, + params: dict | None, + files: dict | None, + ) -> tuple[str, dict, bytes | dict | str | None, dict | None, dict | None]: + url = plugin_daemon_inner_api_baseurl / path + prepared_headers = dict(headers or {}) + prepared_headers["X-Api-Key"] = dify_config.PLUGIN_DAEMON_KEY + prepared_headers.setdefault("Accept-Encoding", "gzip, deflate, br") + + prepared_data: bytes | dict | str | None = ( + data if isinstance(data, (bytes, str, dict)) or data is None else None + ) + if isinstance(data, dict): + if prepared_headers.get("Content-Type") == "application/json": + prepared_data = json.dumps(data) + else: + prepared_data = data + + return str(url), prepared_headers, prepared_data, params, files + def _stream_request( self, method: str, @@ -78,17 +104,38 @@ class BasePluginClient: headers: dict | None = None, data: bytes | dict | None = None, files: dict | None = None, - ) -> Generator[bytes, None, None]: + ) -> Generator[str, None, None]: """ Make a stream request to the plugin daemon inner API """ - response = self._request(method, path, headers, data, params, files, stream=True) - for line in response.iter_lines(chunk_size=1024 * 8): - line = line.decode("utf-8").strip() - if line.startswith("data:"): - line = line[5:].strip() - if line: - yield line + url, headers, prepared_data, params, files = self._prepare_request(path, headers, data, params, files) + + stream_kwargs: dict[str, Any] = { + "method": method, + "url": url, + "headers": headers, + "params": params, + "files": files, + } + if isinstance(prepared_data, dict): + stream_kwargs["data"] = prepared_data + elif prepared_data is not None: + stream_kwargs["content"] = prepared_data + + try: + with httpx.stream(**stream_kwargs) as response: + for raw_line in response.iter_lines(): + if raw_line is None: + continue + line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line + line = line.strip() + if line.startswith("data:"): + line = line[5:].strip() + if line: + yield line + except httpx.RequestError: + logger.exception("Stream request to Plugin Daemon Service failed") + raise PluginDaemonInnerError(code=-500, message="Request to Plugin Daemon Service failed") def _stream_request_with_model( self, @@ -139,7 +186,7 @@ class BasePluginClient: try: response = self._request(method, path, headers, data, params, files) response.raise_for_status() - except HTTPError as e: + except httpx.HTTPStatusError as e: logger.exception("Failed to request plugin daemon, status: %s, url: %s", e.response.status_code, path) raise e except Exception as e: diff --git a/api/core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py b/api/core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py index 2c147fa7ca..ecb7a3916e 100644 --- a/api/core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py +++ b/api/core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py @@ -4,7 +4,7 @@ import math from typing import Any, cast from urllib.parse import urlparse -import requests +from elasticsearch import ConnectionError as ElasticsearchConnectionError from elasticsearch import Elasticsearch from flask import current_app from packaging.version import parse as parse_version @@ -138,7 +138,7 @@ class ElasticSearchVector(BaseVector): if not client.ping(): raise ConnectionError("Failed to connect to Elasticsearch") - except requests.ConnectionError as e: + except ElasticsearchConnectionError as e: raise ConnectionError(f"Vector database connection error: {str(e)}") except Exception as e: raise ConnectionError(f"Elasticsearch client initialization failed: {str(e)}") diff --git a/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_on_qdrant_vector.py b/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_on_qdrant_vector.py index f90a311df4..1ac10209d3 100644 --- a/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_on_qdrant_vector.py +++ b/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_on_qdrant_vector.py @@ -5,9 +5,10 @@ from collections.abc import Generator, Iterable, Sequence from itertools import islice from typing import TYPE_CHECKING, Any, Union +import httpx import qdrant_client -import requests from flask import current_app +from httpx import DigestAuth from pydantic import BaseModel from qdrant_client.http import models as rest from qdrant_client.http.models import ( @@ -19,7 +20,6 @@ from qdrant_client.http.models import ( TokenizerType, ) from qdrant_client.local.qdrant_local import QdrantLocal -from requests.auth import HTTPDigestAuth from sqlalchemy import select from configs import dify_config @@ -504,10 +504,10 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory): } cluster_data = {"displayName": display_name, "region": region_object, "labels": labels} - response = requests.post( + response = httpx.post( f"{tidb_config.api_url}/clusters", json=cluster_data, - auth=HTTPDigestAuth(tidb_config.public_key, tidb_config.private_key), + auth=DigestAuth(tidb_config.public_key, tidb_config.private_key), ) if response.status_code == 200: @@ -527,10 +527,10 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory): body = {"password": new_password} - response = requests.put( + response = httpx.put( f"{tidb_config.api_url}/clusters/{cluster_id}/password", json=body, - auth=HTTPDigestAuth(tidb_config.public_key, tidb_config.private_key), + auth=DigestAuth(tidb_config.public_key, tidb_config.private_key), ) if response.status_code == 200: diff --git a/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py b/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py index e1d4422144..754c149241 100644 --- a/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py +++ b/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py @@ -2,8 +2,8 @@ import time import uuid from collections.abc import Sequence -import requests -from requests.auth import HTTPDigestAuth +import httpx +from httpx import DigestAuth from configs import dify_config from extensions.ext_database import db @@ -49,7 +49,7 @@ class TidbService: "rootPassword": password, } - response = requests.post(f"{api_url}/clusters", json=cluster_data, auth=HTTPDigestAuth(public_key, private_key)) + response = httpx.post(f"{api_url}/clusters", json=cluster_data, auth=DigestAuth(public_key, private_key)) if response.status_code == 200: response_data = response.json() @@ -83,7 +83,7 @@ class TidbService: :return: The response from the API. """ - response = requests.delete(f"{api_url}/clusters/{cluster_id}", auth=HTTPDigestAuth(public_key, private_key)) + response = httpx.delete(f"{api_url}/clusters/{cluster_id}", auth=DigestAuth(public_key, private_key)) if response.status_code == 200: return response.json() @@ -102,7 +102,7 @@ class TidbService: :return: The response from the API. """ - response = requests.get(f"{api_url}/clusters/{cluster_id}", auth=HTTPDigestAuth(public_key, private_key)) + response = httpx.get(f"{api_url}/clusters/{cluster_id}", auth=DigestAuth(public_key, private_key)) if response.status_code == 200: return response.json() @@ -127,10 +127,10 @@ class TidbService: body = {"password": new_password, "builtinRole": "role_admin", "customRoles": []} - response = requests.patch( + response = httpx.patch( f"{api_url}/clusters/{cluster_id}/sqlUsers/{account}", json=body, - auth=HTTPDigestAuth(public_key, private_key), + auth=DigestAuth(public_key, private_key), ) if response.status_code == 200: @@ -161,9 +161,7 @@ class TidbService: tidb_serverless_list_map = {item.cluster_id: item for item in tidb_serverless_list} cluster_ids = [item.cluster_id for item in tidb_serverless_list] params = {"clusterIds": cluster_ids, "view": "BASIC"} - response = requests.get( - f"{api_url}/clusters:batchGet", params=params, auth=HTTPDigestAuth(public_key, private_key) - ) + response = httpx.get(f"{api_url}/clusters:batchGet", params=params, auth=DigestAuth(public_key, private_key)) if response.status_code == 200: response_data = response.json() @@ -224,8 +222,8 @@ class TidbService: clusters.append(cluster_data) request_body = {"requests": clusters} - response = requests.post( - f"{api_url}/clusters:batchCreate", json=request_body, auth=HTTPDigestAuth(public_key, private_key) + response = httpx.post( + f"{api_url}/clusters:batchCreate", json=request_body, auth=DigestAuth(public_key, private_key) ) if response.status_code == 200: diff --git a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py index 3ec08b93ed..d84ae6010d 100644 --- a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py +++ b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py @@ -2,7 +2,6 @@ import datetime import json from typing import Any -import requests import weaviate # type: ignore from pydantic import BaseModel, model_validator @@ -45,8 +44,8 @@ class WeaviateVector(BaseVector): client = weaviate.Client( url=config.endpoint, auth_client_secret=auth_config, timeout_config=(5, 60), startup_period=None ) - except requests.ConnectionError: - raise ConnectionError("Vector database connection error") + except Exception as exc: + raise ConnectionError("Vector database connection error") from exc client.batch.configure( # `batch_size` takes an `int` value to enable auto-batching diff --git a/api/core/rag/extractor/firecrawl/firecrawl_app.py b/api/core/rag/extractor/firecrawl/firecrawl_app.py index e1ba6ef243..c20ecd2b89 100644 --- a/api/core/rag/extractor/firecrawl/firecrawl_app.py +++ b/api/core/rag/extractor/firecrawl/firecrawl_app.py @@ -2,7 +2,7 @@ import json import time from typing import Any, cast -import requests +import httpx from extensions.ext_storage import storage @@ -104,18 +104,18 @@ class FirecrawlApp: def _prepare_headers(self) -> dict[str, Any]: return {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"} - def _post_request(self, url, data, headers, retries=3, backoff_factor=0.5) -> requests.Response: + def _post_request(self, url, data, headers, retries=3, backoff_factor=0.5) -> httpx.Response: for attempt in range(retries): - response = requests.post(url, headers=headers, json=data) + response = httpx.post(url, headers=headers, json=data) if response.status_code == 502: time.sleep(backoff_factor * (2**attempt)) else: return response return response - def _get_request(self, url, headers, retries=3, backoff_factor=0.5) -> requests.Response: + def _get_request(self, url, headers, retries=3, backoff_factor=0.5) -> httpx.Response: for attempt in range(retries): - response = requests.get(url, headers=headers) + response = httpx.get(url, headers=headers) if response.status_code == 502: time.sleep(backoff_factor * (2**attempt)) else: diff --git a/api/core/rag/extractor/notion_extractor.py b/api/core/rag/extractor/notion_extractor.py index bddf41af43..e87ab38349 100644 --- a/api/core/rag/extractor/notion_extractor.py +++ b/api/core/rag/extractor/notion_extractor.py @@ -3,7 +3,7 @@ import logging import operator from typing import Any, cast -import requests +import httpx from configs import dify_config from core.rag.extractor.extractor_base import BaseExtractor @@ -92,7 +92,7 @@ class NotionExtractor(BaseExtractor): if next_cursor: current_query["start_cursor"] = next_cursor - res = requests.post( + res = httpx.post( DATABASE_URL_TMPL.format(database_id=database_id), headers={ "Authorization": "Bearer " + self._notion_access_token, @@ -160,7 +160,7 @@ class NotionExtractor(BaseExtractor): while True: query_dict: dict[str, Any] = {} if not start_cursor else {"start_cursor": start_cursor} try: - res = requests.request( + res = httpx.request( "GET", block_url, headers={ @@ -173,7 +173,7 @@ class NotionExtractor(BaseExtractor): if res.status_code != 200: raise ValueError(f"Error fetching Notion block data: {res.text}") data = res.json() - except requests.RequestException as e: + except httpx.HTTPError as e: raise ValueError("Error fetching Notion block data") from e if "results" not in data or not isinstance(data["results"], list): raise ValueError("Error fetching Notion block data") @@ -222,7 +222,7 @@ class NotionExtractor(BaseExtractor): while True: query_dict: dict[str, Any] = {} if not start_cursor else {"start_cursor": start_cursor} - res = requests.request( + res = httpx.request( "GET", block_url, headers={ @@ -282,7 +282,7 @@ class NotionExtractor(BaseExtractor): while not done: query_dict: dict[str, Any] = {} if not start_cursor else {"start_cursor": start_cursor} - res = requests.request( + res = httpx.request( "GET", block_url, headers={ @@ -354,7 +354,7 @@ class NotionExtractor(BaseExtractor): query_dict: dict[str, Any] = {} - res = requests.request( + res = httpx.request( "GET", retrieve_page_url, headers={ diff --git a/api/core/rag/extractor/watercrawl/client.py b/api/core/rag/extractor/watercrawl/client.py index 6d596e07d8..7cf6c4d289 100644 --- a/api/core/rag/extractor/watercrawl/client.py +++ b/api/core/rag/extractor/watercrawl/client.py @@ -3,8 +3,8 @@ from collections.abc import Generator from typing import Union from urllib.parse import urljoin -import requests -from requests import Response +import httpx +from httpx import Response from core.rag.extractor.watercrawl.exceptions import ( WaterCrawlAuthenticationError, @@ -20,28 +20,45 @@ class BaseAPIClient: self.session = self.init_session() def init_session(self): - session = requests.Session() - session.headers.update({"X-API-Key": self.api_key}) - session.headers.update({"Content-Type": "application/json"}) - session.headers.update({"Accept": "application/json"}) - session.headers.update({"User-Agent": "WaterCrawl-Plugin"}) - session.headers.update({"Accept-Language": "en-US"}) - return session + headers = { + "X-API-Key": self.api_key, + "Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": "WaterCrawl-Plugin", + "Accept-Language": "en-US", + } + return httpx.Client(headers=headers, timeout=None) + + def _request( + self, + method: str, + endpoint: str, + query_params: dict | None = None, + data: dict | None = None, + **kwargs, + ) -> Response: + stream = kwargs.pop("stream", False) + url = urljoin(self.base_url, endpoint) + if stream: + request = self.session.build_request(method, url, params=query_params, json=data) + return self.session.send(request, stream=True, **kwargs) + + return self.session.request(method, url, params=query_params, json=data, **kwargs) def _get(self, endpoint: str, query_params: dict | None = None, **kwargs): - return self.session.get(urljoin(self.base_url, endpoint), params=query_params, **kwargs) + return self._request("GET", endpoint, query_params=query_params, **kwargs) def _post(self, endpoint: str, query_params: dict | None = None, data: dict | None = None, **kwargs): - return self.session.post(urljoin(self.base_url, endpoint), params=query_params, json=data, **kwargs) + return self._request("POST", endpoint, query_params=query_params, data=data, **kwargs) def _put(self, endpoint: str, query_params: dict | None = None, data: dict | None = None, **kwargs): - return self.session.put(urljoin(self.base_url, endpoint), params=query_params, json=data, **kwargs) + return self._request("PUT", endpoint, query_params=query_params, data=data, **kwargs) def _delete(self, endpoint: str, query_params: dict | None = None, **kwargs): - return self.session.delete(urljoin(self.base_url, endpoint), params=query_params, **kwargs) + return self._request("DELETE", endpoint, query_params=query_params, **kwargs) def _patch(self, endpoint: str, query_params: dict | None = None, data: dict | None = None, **kwargs): - return self.session.patch(urljoin(self.base_url, endpoint), params=query_params, json=data, **kwargs) + return self._request("PATCH", endpoint, query_params=query_params, data=data, **kwargs) class WaterCrawlAPIClient(BaseAPIClient): @@ -49,14 +66,17 @@ class WaterCrawlAPIClient(BaseAPIClient): super().__init__(api_key, base_url) def process_eventstream(self, response: Response, download: bool = False) -> Generator: - for line in response.iter_lines(): - line = line.decode("utf-8") - if line.startswith("data:"): - line = line[5:].strip() - data = json.loads(line) - if data["type"] == "result" and download: - data["data"] = self.download_result(data["data"]) - yield data + try: + for raw_line in response.iter_lines(): + line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line + if line.startswith("data:"): + line = line[5:].strip() + data = json.loads(line) + if data["type"] == "result" and download: + data["data"] = self.download_result(data["data"]) + yield data + finally: + response.close() def process_response(self, response: Response) -> dict | bytes | list | None | Generator: if response.status_code == 401: @@ -170,7 +190,10 @@ class WaterCrawlAPIClient(BaseAPIClient): return event_data["data"] def download_result(self, result_object: dict): - response = requests.get(result_object["result"]) - response.raise_for_status() - result_object["result"] = response.json() + response = httpx.get(result_object["result"], timeout=None) + try: + response.raise_for_status() + result_object["result"] = response.json() + finally: + response.close() return result_object diff --git a/api/core/rag/extractor/word_extractor.py b/api/core/rag/extractor/word_extractor.py index f25f92cf81..1a9704688a 100644 --- a/api/core/rag/extractor/word_extractor.py +++ b/api/core/rag/extractor/word_extractor.py @@ -9,7 +9,7 @@ import uuid from urllib.parse import urlparse from xml.etree import ElementTree -import requests +import httpx from docx import Document as DocxDocument from configs import dify_config @@ -43,15 +43,19 @@ class WordExtractor(BaseExtractor): # If the file is a web path, download it to a temporary file, and use that if not os.path.isfile(self.file_path) and self._is_valid_url(self.file_path): - r = requests.get(self.file_path) + response = httpx.get(self.file_path, timeout=None) - if r.status_code != 200: - raise ValueError(f"Check the url of your file; returned status code {r.status_code}") + if response.status_code != 200: + response.close() + raise ValueError(f"Check the url of your file; returned status code {response.status_code}") self.web_path = self.file_path # TODO: use a better way to handle the file self.temp_file = tempfile.NamedTemporaryFile() # noqa SIM115 - self.temp_file.write(r.content) + try: + self.temp_file.write(response.content) + finally: + response.close() self.file_path = self.temp_file.name elif not os.path.isfile(self.file_path): raise ValueError(f"File path {self.file_path} is not a valid file or url") diff --git a/api/core/tools/utils/parser.py b/api/core/tools/utils/parser.py index fcb1d325af..35fd7895b9 100644 --- a/api/core/tools/utils/parser.py +++ b/api/core/tools/utils/parser.py @@ -4,8 +4,8 @@ from json import loads as json_loads from json.decoder import JSONDecodeError from typing import Any +import httpx from flask import request -from requests import get from yaml import YAMLError, safe_load from core.tools.entities.common_entities import I18nObject @@ -334,15 +334,20 @@ class ApiBasedToolSchemaParser: raise ToolNotSupportedError("Only openapi is supported now.") # get openapi yaml - response = get(api_url, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "}, timeout=5) - - if response.status_code != 200: - raise ToolProviderNotFoundError("cannot get openapi yaml from url.") - - return ApiBasedToolSchemaParser.parse_openapi_yaml_to_tool_bundle( - response.text, extra_info=extra_info, warning=warning + response = httpx.get( + api_url, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "}, timeout=5 ) + try: + if response.status_code != 200: + raise ToolProviderNotFoundError("cannot get openapi yaml from url.") + + return ApiBasedToolSchemaParser.parse_openapi_yaml_to_tool_bundle( + response.text, extra_info=extra_info, warning=warning + ) + finally: + response.close() + @staticmethod def auto_parse_to_tool_bundle( content: str, extra_info: dict | None = None, warning: dict | None = None diff --git a/api/extensions/ext_otel.py b/api/extensions/ext_otel.py index 19c6e68c6b..cb6e4849a9 100644 --- a/api/extensions/ext_otel.py +++ b/api/extensions/ext_otel.py @@ -138,7 +138,6 @@ def init_app(app: DifyApp): from opentelemetry.instrumentation.flask import FlaskInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.instrumentation.redis import RedisInstrumentor - from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider from opentelemetry.propagate import set_global_textmap @@ -238,7 +237,6 @@ def init_app(app: DifyApp): instrument_exception_logging() init_sqlalchemy_instrumentor(app) RedisInstrumentor().instrument() - RequestsInstrumentor().instrument() HTTPXClientInstrumentor().instrument() atexit.register(shutdown_tracer) diff --git a/api/pyproject.toml b/api/pyproject.toml index e2a50a43f6..22eedf7b8b 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ "opentelemetry-instrumentation-flask==0.48b0", "opentelemetry-instrumentation-httpx==0.48b0", "opentelemetry-instrumentation-redis==0.48b0", - "opentelemetry-instrumentation-requests==0.48b0", + "opentelemetry-instrumentation-httpx==0.48b0", "opentelemetry-instrumentation-sqlalchemy==0.48b0", "opentelemetry-propagator-b3==1.27.0", # opentelemetry-proto1.28.0 depends on protobuf (>=5.0,<6.0), @@ -145,8 +145,6 @@ dev = [ "types-pywin32~=310.0.0", "types-pyyaml~=6.0.12", "types-regex~=2024.11.6", - "types-requests~=2.32.0", - "types-requests-oauthlib~=2.0.0", "types-shapely~=2.0.0", "types-simplejson>=3.20.0", "types-six>=1.17.0", diff --git a/api/pyrightconfig.json b/api/pyrightconfig.json index 67571316a9..bf4ec2314e 100644 --- a/api/pyrightconfig.json +++ b/api/pyrightconfig.json @@ -15,7 +15,8 @@ "opentelemetry.instrumentation.httpx", "opentelemetry.instrumentation.requests", "opentelemetry.instrumentation.sqlalchemy", - "opentelemetry.instrumentation.redis" + "opentelemetry.instrumentation.redis", + "opentelemetry.instrumentation.httpx" ], "reportUnknownMemberType": "hint", "reportUnknownParameterType": "hint", diff --git a/api/services/enterprise/base.py b/api/services/enterprise/base.py index edb76408e8..bdc960aa2d 100644 --- a/api/services/enterprise/base.py +++ b/api/services/enterprise/base.py @@ -1,10 +1,12 @@ import os +from collections.abc import Mapping +from typing import Any -import requests +import httpx class BaseRequest: - proxies = { + proxies: Mapping[str, str] | None = { "http": "", "https": "", } @@ -13,10 +15,31 @@ class BaseRequest: secret_key_header = "" @classmethod - def send_request(cls, method, endpoint, json=None, params=None): + def _build_mounts(cls) -> dict[str, httpx.BaseTransport] | None: + if not cls.proxies: + return None + + mounts: dict[str, httpx.BaseTransport] = {} + for scheme, value in cls.proxies.items(): + if not value: + continue + key = f"{scheme}://" if not scheme.endswith("://") else scheme + mounts[key] = httpx.HTTPTransport(proxy=value) + return mounts or None + + @classmethod + def send_request( + cls, + method: str, + endpoint: str, + json: Any | None = None, + params: Mapping[str, Any] | None = None, + ) -> Any: headers = {"Content-Type": "application/json", cls.secret_key_header: cls.secret_key} url = f"{cls.base_url}{endpoint}" - response = requests.request(method, url, json=json, params=params, headers=headers, proxies=cls.proxies) + mounts = cls._build_mounts() + with httpx.Client(mounts=mounts) as client: + response = client.request(method, url, json=json, params=params, headers=headers) return response.json() diff --git a/api/services/rag_pipeline/pipeline_template/remote/remote_retrieval.py b/api/services/rag_pipeline/pipeline_template/remote/remote_retrieval.py index 8f96842337..571ca6c7a6 100644 --- a/api/services/rag_pipeline/pipeline_template/remote/remote_retrieval.py +++ b/api/services/rag_pipeline/pipeline_template/remote/remote_retrieval.py @@ -1,6 +1,6 @@ import logging -import requests +import httpx from configs import dify_config from services.rag_pipeline.pipeline_template.database.database_retrieval import DatabasePipelineTemplateRetrieval @@ -43,7 +43,7 @@ class RemotePipelineTemplateRetrieval(PipelineTemplateRetrievalBase): """ domain = dify_config.HOSTED_FETCH_PIPELINE_TEMPLATES_REMOTE_DOMAIN url = f"{domain}/pipeline-templates/{template_id}" - response = requests.get(url, timeout=(3, 10)) + response = httpx.get(url, timeout=httpx.Timeout(10.0, connect=3.0)) if response.status_code != 200: return None data: dict = response.json() @@ -58,7 +58,7 @@ class RemotePipelineTemplateRetrieval(PipelineTemplateRetrievalBase): """ domain = dify_config.HOSTED_FETCH_PIPELINE_TEMPLATES_REMOTE_DOMAIN url = f"{domain}/pipeline-templates?language={language}" - response = requests.get(url, timeout=(3, 10)) + response = httpx.get(url, timeout=httpx.Timeout(10.0, connect=3.0)) if response.status_code != 200: raise ValueError(f"fetch pipeline templates failed, status code: {response.status_code}") diff --git a/api/services/recommend_app/remote/remote_retrieval.py b/api/services/recommend_app/remote/remote_retrieval.py index 2d57769f63..b217c9026a 100644 --- a/api/services/recommend_app/remote/remote_retrieval.py +++ b/api/services/recommend_app/remote/remote_retrieval.py @@ -1,6 +1,6 @@ import logging -import requests +import httpx from configs import dify_config from services.recommend_app.buildin.buildin_retrieval import BuildInRecommendAppRetrieval @@ -43,7 +43,7 @@ class RemoteRecommendAppRetrieval(RecommendAppRetrievalBase): """ domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN url = f"{domain}/apps/{app_id}" - response = requests.get(url, timeout=(3, 10)) + response = httpx.get(url, timeout=httpx.Timeout(10.0, connect=3.0)) if response.status_code != 200: return None data: dict = response.json() @@ -58,7 +58,7 @@ class RemoteRecommendAppRetrieval(RecommendAppRetrievalBase): """ domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN url = f"{domain}/apps?language={language}" - response = requests.get(url, timeout=(3, 10)) + response = httpx.get(url, timeout=httpx.Timeout(10.0, connect=3.0)) if response.status_code != 200: raise ValueError(f"fetch recommended apps failed, status code: {response.status_code}") diff --git a/api/tests/integration_tests/vdb/__mock/baiduvectordb.py b/api/tests/integration_tests/vdb/__mock/baiduvectordb.py index 6d2aff5197..8a43d03a43 100644 --- a/api/tests/integration_tests/vdb/__mock/baiduvectordb.py +++ b/api/tests/integration_tests/vdb/__mock/baiduvectordb.py @@ -1,5 +1,6 @@ import os from collections import UserDict +from typing import Any from unittest.mock import MagicMock import pytest @@ -9,7 +10,6 @@ from pymochow.model.database import Database # type: ignore from pymochow.model.enum import IndexState, IndexType, MetricType, ReadConsistency, TableState # type: ignore from pymochow.model.schema import HNSWParams, VectorIndex # type: ignore from pymochow.model.table import Table # type: ignore -from requests.adapters import HTTPAdapter class AttrDict(UserDict): @@ -21,7 +21,7 @@ class MockBaiduVectorDBClass: def mock_vector_db_client( self, config=None, - adapter: HTTPAdapter | None = None, + adapter: Any | None = None, ): self.conn = MagicMock() self._config = MagicMock() diff --git a/api/tests/integration_tests/vdb/__mock/tcvectordb.py b/api/tests/integration_tests/vdb/__mock/tcvectordb.py index e0b908cece..5130fcfe17 100644 --- a/api/tests/integration_tests/vdb/__mock/tcvectordb.py +++ b/api/tests/integration_tests/vdb/__mock/tcvectordb.py @@ -1,9 +1,8 @@ import os -from typing import Union +from typing import Any, Union import pytest from _pytest.monkeypatch import MonkeyPatch -from requests.adapters import HTTPAdapter from tcvectordb import RPCVectorDBClient # type: ignore from tcvectordb.model import enum from tcvectordb.model.collection import FilterIndexConfig @@ -23,7 +22,7 @@ class MockTcvectordbClass: key="", read_consistency: ReadConsistency = ReadConsistency.EVENTUAL_CONSISTENCY, timeout=10, - adapter: HTTPAdapter | None = None, + adapter: Any | None = None, pool_size: int = 2, proxies: dict | None = None, password: str | None = None, diff --git a/api/tests/unit_tests/core/rag/extractor/firecrawl/test_firecrawl.py b/api/tests/unit_tests/core/rag/extractor/firecrawl/test_firecrawl.py index 6689e13b96..e5ead6ff66 100644 --- a/api/tests/unit_tests/core/rag/extractor/firecrawl/test_firecrawl.py +++ b/api/tests/unit_tests/core/rag/extractor/firecrawl/test_firecrawl.py @@ -18,7 +18,7 @@ def test_firecrawl_web_extractor_crawl_mode(mocker): mocked_firecrawl = { "id": "test", } - mocker.patch("requests.post", return_value=_mock_response(mocked_firecrawl)) + mocker.patch("httpx.post", return_value=_mock_response(mocked_firecrawl)) job_id = firecrawl_app.crawl_url(url, params) assert job_id is not None diff --git a/api/tests/unit_tests/core/rag/extractor/test_notion_extractor.py b/api/tests/unit_tests/core/rag/extractor/test_notion_extractor.py index eea584a2f8..f1e1820acc 100644 --- a/api/tests/unit_tests/core/rag/extractor/test_notion_extractor.py +++ b/api/tests/unit_tests/core/rag/extractor/test_notion_extractor.py @@ -69,7 +69,7 @@ def test_notion_page(mocker): ], "next_cursor": None, } - mocker.patch("requests.request", return_value=_mock_response(mocked_notion_page)) + mocker.patch("httpx.request", return_value=_mock_response(mocked_notion_page)) page_docs = extractor._load_data_as_documents(page_id, "page") assert len(page_docs) == 1 @@ -84,7 +84,7 @@ def test_notion_database(mocker): "results": [_generate_page(i) for i in page_title_list], "next_cursor": None, } - mocker.patch("requests.post", return_value=_mock_response(mocked_notion_database)) + mocker.patch("httpx.post", return_value=_mock_response(mocked_notion_database)) database_docs = extractor._load_data_as_documents(database_id, "database") assert len(database_docs) == 1 content = _remove_multiple_new_lines(database_docs[0].page_content) diff --git a/api/uv.lock b/api/uv.lock index 43db17b06f..af368199b7 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1325,7 +1325,6 @@ dependencies = [ { name = "opentelemetry-instrumentation-flask" }, { name = "opentelemetry-instrumentation-httpx" }, { name = "opentelemetry-instrumentation-redis" }, - { name = "opentelemetry-instrumentation-requests" }, { name = "opentelemetry-instrumentation-sqlalchemy" }, { name = "opentelemetry-propagator-b3" }, { name = "opentelemetry-proto" }, @@ -1418,8 +1417,6 @@ dev = [ { name = "types-pyyaml" }, { name = "types-redis" }, { name = "types-regex" }, - { name = "types-requests" }, - { name = "types-requests-oauthlib" }, { name = "types-setuptools" }, { name = "types-shapely" }, { name = "types-simplejson" }, @@ -1516,7 +1513,6 @@ requires-dist = [ { name = "opentelemetry-instrumentation-flask", specifier = "==0.48b0" }, { name = "opentelemetry-instrumentation-httpx", specifier = "==0.48b0" }, { name = "opentelemetry-instrumentation-redis", specifier = "==0.48b0" }, - { name = "opentelemetry-instrumentation-requests", specifier = "==0.48b0" }, { name = "opentelemetry-instrumentation-sqlalchemy", specifier = "==0.48b0" }, { name = "opentelemetry-propagator-b3", specifier = "==1.27.0" }, { name = "opentelemetry-proto", specifier = "==1.27.0" }, @@ -1609,8 +1605,6 @@ dev = [ { name = "types-pyyaml", specifier = "~=6.0.12" }, { name = "types-redis", specifier = ">=4.6.0.20241004" }, { name = "types-regex", specifier = "~=2024.11.6" }, - { name = "types-requests", specifier = "~=2.32.0" }, - { name = "types-requests-oauthlib", specifier = "~=2.0.0" }, { name = "types-setuptools", specifier = ">=80.9.0" }, { name = "types-shapely", specifier = "~=2.0.0" }, { name = "types-simplejson", specifier = ">=3.20.0" }, @@ -3910,21 +3904,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/94/40/892f30d400091106309cc047fd3f6d76a828fedd984a953fd5386b78a2fb/opentelemetry_instrumentation_redis-0.48b0-py3-none-any.whl", hash = "sha256:48c7f2e25cbb30bde749dc0d8b9c74c404c851f554af832956b9630b27f5bcb7", size = 11610, upload-time = "2024-08-28T21:27:18.759Z" }, ] -[[package]] -name = "opentelemetry-instrumentation-requests" -version = "0.48b0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "opentelemetry-api" }, - { name = "opentelemetry-instrumentation" }, - { name = "opentelemetry-semantic-conventions" }, - { name = "opentelemetry-util-http" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/52/ac/5eb78efde21ff21d0ad5dc8c6cc6a0f8ae482ce8a46293c2f45a628b6166/opentelemetry_instrumentation_requests-0.48b0.tar.gz", hash = "sha256:67ab9bd877a0352ee0db4616c8b4ae59736ddd700c598ed907482d44f4c9a2b3", size = 14120, upload-time = "2024-08-28T21:28:16.933Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/43/df/0df9226d1b14f29d23c07e6194b9fd5ad50e7d987b7fd13df7dcf718aeb1/opentelemetry_instrumentation_requests-0.48b0-py3-none-any.whl", hash = "sha256:d4f01852121d0bd4c22f14f429654a735611d4f7bf3cf93f244bdf1489b2233d", size = 12366, upload-time = "2024-08-28T21:27:20.771Z" }, -] - [[package]] name = "opentelemetry-instrumentation-sqlalchemy" version = "0.48b0" @@ -6440,19 +6419,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2b/6f/ec0012be842b1d888d46884ac5558fd62aeae1f0ec4f7a581433d890d4b5/types_requests-2.32.4.20250809-py3-none-any.whl", hash = "sha256:f73d1832fb519ece02c85b1f09d5f0dd3108938e7d47e7f94bbfa18a6782b163", size = 20644, upload-time = "2025-08-09T03:17:09.716Z" }, ] -[[package]] -name = "types-requests-oauthlib" -version = "2.0.0.20250809" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "types-oauthlib" }, - { name = "types-requests" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/ed/40/5eca857a2dbda0fedd69b7fd3f51cb0b6ece8d448327d29f0ae54612ec98/types_requests_oauthlib-2.0.0.20250809.tar.gz", hash = "sha256:f3b9b31e0394fe2c362f0d44bc9ef6d5c150a298d01089513cd54a51daec37a2", size = 11008, upload-time = "2025-08-09T03:17:50.705Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/f3/38/8777f0ab409a7249777f230f6aefe0e9ba98355dc8b05fb31391fa30f312/types_requests_oauthlib-2.0.0.20250809-py3-none-any.whl", hash = "sha256:0d1af4907faf9f4a1b0f0afbc7ec488f1dd5561a2b5b6dad70f78091a1acfb76", size = 14319, upload-time = "2025-08-09T03:17:49.786Z" }, -] - [[package]] name = "types-s3transfer" version = "0.13.1" diff --git a/scripts/stress-test/setup/import_workflow_app.py b/scripts/stress-test/setup/import_workflow_app.py index 86d0239e35..41a76bd29b 100755 --- a/scripts/stress-test/setup/import_workflow_app.py +++ b/scripts/stress-test/setup/import_workflow_app.py @@ -8,7 +8,7 @@ sys.path.append(str(Path(__file__).parent.parent)) import json import httpx -from common import Logger, config_helper +from common import Logger, config_helper # type: ignore[import] def import_workflow_app() -> None: