mirror of
https://github.com/langgenius/dify.git
synced 2025-12-19 17:27:16 -05:00
Co-authored-by: hieheihei <270985384@qq.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: -LAN- <laipz8200@outlook.com>
891 lines
34 KiB
Python
891 lines
34 KiB
Python
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from collections.abc import Sequence
|
|
from typing import Any
|
|
|
|
import sqlalchemy as sa
|
|
from aliyun.log import ( # type: ignore[import-untyped]
|
|
GetLogsRequest,
|
|
IndexConfig,
|
|
IndexKeyConfig,
|
|
IndexLineConfig,
|
|
LogClient,
|
|
LogItem,
|
|
PutLogsRequest,
|
|
)
|
|
from aliyun.log.auth import AUTH_VERSION_4 # type: ignore[import-untyped]
|
|
from aliyun.log.logexception import LogException # type: ignore[import-untyped]
|
|
from dotenv import load_dotenv
|
|
from sqlalchemy.orm import DeclarativeBase
|
|
|
|
from configs import dify_config
|
|
from extensions.logstore.aliyun_logstore_pg import AliyunLogStorePG
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AliyunLogStore:
|
|
"""
|
|
Singleton class for Aliyun SLS LogStore operations.
|
|
|
|
Ensures only one instance exists to prevent multiple PG connection pools.
|
|
"""
|
|
|
|
_instance: "AliyunLogStore | None" = None
|
|
_initialized: bool = False
|
|
|
|
# Track delayed PG connection for newly created projects
|
|
_pg_connection_timer: threading.Timer | None = None
|
|
_pg_connection_delay: int = 90 # delay seconds
|
|
|
|
# Default tokenizer for text/json fields and full-text index
|
|
# Common delimiters: comma, space, quotes, punctuation, operators, brackets, special chars
|
|
DEFAULT_TOKEN_LIST = [
|
|
",",
|
|
" ",
|
|
'"',
|
|
'"',
|
|
";",
|
|
"=",
|
|
"(",
|
|
")",
|
|
"[",
|
|
"]",
|
|
"{",
|
|
"}",
|
|
"?",
|
|
"@",
|
|
"&",
|
|
"<",
|
|
">",
|
|
"/",
|
|
":",
|
|
"\n",
|
|
"\t",
|
|
]
|
|
|
|
def __new__(cls) -> "AliyunLogStore":
|
|
"""Implement singleton pattern."""
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
project_des = "dify"
|
|
|
|
workflow_execution_logstore = "workflow_execution"
|
|
|
|
workflow_node_execution_logstore = "workflow_node_execution"
|
|
|
|
@staticmethod
|
|
def _sqlalchemy_type_to_logstore_type(column: Any) -> str:
|
|
"""
|
|
Map SQLAlchemy column type to Aliyun LogStore index type.
|
|
|
|
Args:
|
|
column: SQLAlchemy column object
|
|
|
|
Returns:
|
|
LogStore index type: 'text', 'long', 'double', or 'json'
|
|
"""
|
|
column_type = column.type
|
|
|
|
# Integer types -> long
|
|
if isinstance(column_type, (sa.Integer, sa.BigInteger, sa.SmallInteger)):
|
|
return "long"
|
|
|
|
# Float types -> double
|
|
if isinstance(column_type, (sa.Float, sa.Numeric)):
|
|
return "double"
|
|
|
|
# String and Text types -> text
|
|
if isinstance(column_type, (sa.String, sa.Text)):
|
|
return "text"
|
|
|
|
# DateTime -> text (stored as ISO format string in logstore)
|
|
if isinstance(column_type, sa.DateTime):
|
|
return "text"
|
|
|
|
# Boolean -> long (stored as 0/1)
|
|
if isinstance(column_type, sa.Boolean):
|
|
return "long"
|
|
|
|
# JSON -> json
|
|
if isinstance(column_type, sa.JSON):
|
|
return "json"
|
|
|
|
# Default to text for unknown types
|
|
return "text"
|
|
|
|
@staticmethod
|
|
def _generate_index_keys_from_model(model_class: type[DeclarativeBase]) -> dict[str, IndexKeyConfig]:
|
|
"""
|
|
Automatically generate LogStore field index configuration from SQLAlchemy model.
|
|
|
|
This method introspects the SQLAlchemy model's column definitions and creates
|
|
corresponding LogStore index configurations. When the PG schema is updated via
|
|
Flask-Migrate, this method will automatically pick up the new fields on next startup.
|
|
|
|
Args:
|
|
model_class: SQLAlchemy model class (e.g., WorkflowRun, WorkflowNodeExecutionModel)
|
|
|
|
Returns:
|
|
Dictionary mapping field names to IndexKeyConfig objects
|
|
"""
|
|
index_keys = {}
|
|
|
|
# Iterate over all mapped columns in the model
|
|
if hasattr(model_class, "__mapper__"):
|
|
for column_name, column_property in model_class.__mapper__.columns.items():
|
|
# Skip relationship properties and other non-column attributes
|
|
if not hasattr(column_property, "type"):
|
|
continue
|
|
|
|
# Map SQLAlchemy type to LogStore type
|
|
logstore_type = AliyunLogStore._sqlalchemy_type_to_logstore_type(column_property)
|
|
|
|
# Create index configuration
|
|
# - text fields: case_insensitive for better search, with tokenizer and Chinese support
|
|
# - all fields: doc_value=True for analytics
|
|
if logstore_type == "text":
|
|
index_keys[column_name] = IndexKeyConfig(
|
|
index_type="text",
|
|
case_sensitive=False,
|
|
doc_value=True,
|
|
token_list=AliyunLogStore.DEFAULT_TOKEN_LIST,
|
|
chinese=True,
|
|
)
|
|
else:
|
|
index_keys[column_name] = IndexKeyConfig(index_type=logstore_type, doc_value=True)
|
|
|
|
# Add log_version field (not in PG model, but used in logstore for versioning)
|
|
index_keys["log_version"] = IndexKeyConfig(index_type="long", doc_value=True)
|
|
|
|
return index_keys
|
|
|
|
def __init__(self) -> None:
|
|
# Skip initialization if already initialized (singleton pattern)
|
|
if self.__class__._initialized:
|
|
return
|
|
|
|
load_dotenv()
|
|
|
|
self.access_key_id: str = os.environ.get("ALIYUN_SLS_ACCESS_KEY_ID", "")
|
|
self.access_key_secret: str = os.environ.get("ALIYUN_SLS_ACCESS_KEY_SECRET", "")
|
|
self.endpoint: str = os.environ.get("ALIYUN_SLS_ENDPOINT", "")
|
|
self.region: str = os.environ.get("ALIYUN_SLS_REGION", "")
|
|
self.project_name: str = os.environ.get("ALIYUN_SLS_PROJECT_NAME", "")
|
|
self.logstore_ttl: int = int(os.environ.get("ALIYUN_SLS_LOGSTORE_TTL", 365))
|
|
self.log_enabled: bool = os.environ.get("SQLALCHEMY_ECHO", "false").lower() == "true"
|
|
self.pg_mode_enabled: bool = os.environ.get("LOGSTORE_PG_MODE_ENABLED", "true").lower() == "true"
|
|
|
|
# Initialize SDK client
|
|
self.client = LogClient(
|
|
self.endpoint, self.access_key_id, self.access_key_secret, auth_version=AUTH_VERSION_4, region=self.region
|
|
)
|
|
|
|
# Append Dify identification to the existing user agent
|
|
original_user_agent = self.client._user_agent # pyright: ignore[reportPrivateUsage]
|
|
dify_version = dify_config.project.version
|
|
enhanced_user_agent = f"Dify,Dify-{dify_version},{original_user_agent}"
|
|
self.client.set_user_agent(enhanced_user_agent)
|
|
|
|
# PG client will be initialized in init_project_logstore
|
|
self._pg_client: AliyunLogStorePG | None = None
|
|
self._use_pg_protocol: bool = False
|
|
|
|
self.__class__._initialized = True
|
|
|
|
@property
|
|
def supports_pg_protocol(self) -> bool:
|
|
"""Check if PG protocol is supported and enabled."""
|
|
return self._use_pg_protocol
|
|
|
|
def _attempt_pg_connection_init(self) -> bool:
|
|
"""
|
|
Attempt to initialize PG connection.
|
|
|
|
This method tries to establish PG connection and performs necessary checks.
|
|
It's used both for immediate connection (existing projects) and delayed connection (new projects).
|
|
|
|
Returns:
|
|
True if PG connection was successfully established, False otherwise.
|
|
"""
|
|
if not self.pg_mode_enabled or not self._pg_client:
|
|
return False
|
|
|
|
try:
|
|
self._use_pg_protocol = self._pg_client.init_connection()
|
|
if self._use_pg_protocol:
|
|
logger.info("Successfully connected to project %s using PG protocol", self.project_name)
|
|
# Check if scan_index is enabled for all logstores
|
|
self._check_and_disable_pg_if_scan_index_disabled()
|
|
return True
|
|
else:
|
|
logger.info("PG connection failed for project %s. Will use SDK mode.", self.project_name)
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to establish PG connection for project %s: %s. Will use SDK mode.",
|
|
self.project_name,
|
|
str(e),
|
|
)
|
|
self._use_pg_protocol = False
|
|
return False
|
|
|
|
def _delayed_pg_connection_init(self) -> None:
|
|
"""
|
|
Delayed initialization of PG connection for newly created projects.
|
|
|
|
This method is called by a background timer 3 minutes after project creation.
|
|
"""
|
|
# Double check conditions in case state changed
|
|
if self._use_pg_protocol:
|
|
return
|
|
|
|
logger.info(
|
|
"Attempting delayed PG connection for newly created project %s ...",
|
|
self.project_name,
|
|
)
|
|
self._attempt_pg_connection_init()
|
|
self.__class__._pg_connection_timer = None
|
|
|
|
def init_project_logstore(self):
|
|
"""
|
|
Initialize project, logstore, index, and PG connection.
|
|
|
|
This method should be called once during application startup to ensure
|
|
all required resources exist and connections are established.
|
|
"""
|
|
# Step 1: Ensure project and logstore exist
|
|
project_is_new = False
|
|
if not self.is_project_exist():
|
|
self.create_project()
|
|
project_is_new = True
|
|
|
|
self.create_logstore_if_not_exist()
|
|
|
|
# Step 2: Initialize PG client and connection (if enabled)
|
|
if not self.pg_mode_enabled:
|
|
logger.info("PG mode is disabled. Will use SDK mode.")
|
|
return
|
|
|
|
# Create PG client if not already created
|
|
if self._pg_client is None:
|
|
logger.info("Initializing PG client for project %s...", self.project_name)
|
|
self._pg_client = AliyunLogStorePG(
|
|
self.access_key_id, self.access_key_secret, self.endpoint, self.project_name
|
|
)
|
|
|
|
# Step 3: Establish PG connection based on project status
|
|
if project_is_new:
|
|
# For newly created projects, schedule delayed PG connection
|
|
self._use_pg_protocol = False
|
|
logger.info(
|
|
"Project %s is newly created. Will use SDK mode and schedule PG connection attempt in %d seconds.",
|
|
self.project_name,
|
|
self.__class__._pg_connection_delay,
|
|
)
|
|
if self.__class__._pg_connection_timer is not None:
|
|
self.__class__._pg_connection_timer.cancel()
|
|
self.__class__._pg_connection_timer = threading.Timer(
|
|
self.__class__._pg_connection_delay,
|
|
self._delayed_pg_connection_init,
|
|
)
|
|
self.__class__._pg_connection_timer.daemon = True # Don't block app shutdown
|
|
self.__class__._pg_connection_timer.start()
|
|
else:
|
|
# For existing projects, attempt PG connection immediately
|
|
logger.info("Project %s already exists. Attempting PG connection...", self.project_name)
|
|
self._attempt_pg_connection_init()
|
|
|
|
def _check_and_disable_pg_if_scan_index_disabled(self) -> None:
|
|
"""
|
|
Check if scan_index is enabled for all logstores.
|
|
If any logstore has scan_index=false, disable PG protocol.
|
|
|
|
This is necessary because PG protocol requires scan_index to be enabled.
|
|
"""
|
|
logstore_name_list = [
|
|
AliyunLogStore.workflow_execution_logstore,
|
|
AliyunLogStore.workflow_node_execution_logstore,
|
|
]
|
|
|
|
for logstore_name in logstore_name_list:
|
|
existing_config = self.get_existing_index_config(logstore_name)
|
|
if existing_config and not existing_config.scan_index:
|
|
logger.info(
|
|
"Logstore %s has scan_index=false, USE SDK mode for read/write operations. "
|
|
"PG protocol requires scan_index to be enabled.",
|
|
logstore_name,
|
|
)
|
|
self._use_pg_protocol = False
|
|
# Close PG connection if it was initialized
|
|
if self._pg_client:
|
|
self._pg_client.close()
|
|
self._pg_client = None
|
|
return
|
|
|
|
def is_project_exist(self) -> bool:
|
|
try:
|
|
self.client.get_project(self.project_name)
|
|
return True
|
|
except Exception as e:
|
|
if e.args[0] == "ProjectNotExist":
|
|
return False
|
|
else:
|
|
raise e
|
|
|
|
def create_project(self):
|
|
try:
|
|
self.client.create_project(self.project_name, AliyunLogStore.project_des)
|
|
logger.info("Project %s created successfully", self.project_name)
|
|
except LogException as e:
|
|
logger.exception(
|
|
"Failed to create project %s: errorCode=%s, errorMessage=%s, requestId=%s",
|
|
self.project_name,
|
|
e.get_error_code(),
|
|
e.get_error_message(),
|
|
e.get_request_id(),
|
|
)
|
|
raise
|
|
|
|
def is_logstore_exist(self, logstore_name: str) -> bool:
|
|
try:
|
|
_ = self.client.get_logstore(self.project_name, logstore_name)
|
|
return True
|
|
except Exception as e:
|
|
if e.args[0] == "LogStoreNotExist":
|
|
return False
|
|
else:
|
|
raise e
|
|
|
|
def create_logstore_if_not_exist(self) -> None:
|
|
logstore_name_list = [
|
|
AliyunLogStore.workflow_execution_logstore,
|
|
AliyunLogStore.workflow_node_execution_logstore,
|
|
]
|
|
|
|
for logstore_name in logstore_name_list:
|
|
if not self.is_logstore_exist(logstore_name):
|
|
try:
|
|
self.client.create_logstore(
|
|
project_name=self.project_name, logstore_name=logstore_name, ttl=self.logstore_ttl
|
|
)
|
|
logger.info("logstore %s created successfully", logstore_name)
|
|
except LogException as e:
|
|
logger.exception(
|
|
"Failed to create logstore %s: errorCode=%s, errorMessage=%s, requestId=%s",
|
|
logstore_name,
|
|
e.get_error_code(),
|
|
e.get_error_message(),
|
|
e.get_request_id(),
|
|
)
|
|
raise
|
|
|
|
# Ensure index contains all Dify-required fields
|
|
# This intelligently merges with existing config, preserving custom indexes
|
|
self.ensure_index_config(logstore_name)
|
|
|
|
def is_index_exist(self, logstore_name: str) -> bool:
|
|
try:
|
|
_ = self.client.get_index_config(self.project_name, logstore_name)
|
|
return True
|
|
except Exception as e:
|
|
if e.args[0] == "IndexConfigNotExist":
|
|
return False
|
|
else:
|
|
raise e
|
|
|
|
def get_existing_index_config(self, logstore_name: str) -> IndexConfig | None:
|
|
"""
|
|
Get existing index configuration from logstore.
|
|
|
|
Args:
|
|
logstore_name: Name of the logstore
|
|
|
|
Returns:
|
|
IndexConfig object if index exists, None otherwise
|
|
"""
|
|
try:
|
|
response = self.client.get_index_config(self.project_name, logstore_name)
|
|
return response.get_index_config()
|
|
except Exception as e:
|
|
if e.args[0] == "IndexConfigNotExist":
|
|
return None
|
|
else:
|
|
logger.exception("Failed to get index config for logstore %s", logstore_name)
|
|
raise e
|
|
|
|
def _get_workflow_execution_index_keys(self) -> dict[str, IndexKeyConfig]:
|
|
"""
|
|
Get field index configuration for workflow_execution logstore.
|
|
|
|
This method automatically generates index configuration from the WorkflowRun SQLAlchemy model.
|
|
When the PG schema is updated via Flask-Migrate, the index configuration will be automatically
|
|
updated on next application startup.
|
|
"""
|
|
from models.workflow import WorkflowRun
|
|
|
|
index_keys = self._generate_index_keys_from_model(WorkflowRun)
|
|
|
|
# Add custom fields that are in logstore but not in PG model
|
|
# These fields are added by the repository layer
|
|
index_keys["error_message"] = IndexKeyConfig(
|
|
index_type="text",
|
|
case_sensitive=False,
|
|
doc_value=True,
|
|
token_list=self.DEFAULT_TOKEN_LIST,
|
|
chinese=True,
|
|
) # Maps to 'error' in PG
|
|
index_keys["started_at"] = IndexKeyConfig(
|
|
index_type="text",
|
|
case_sensitive=False,
|
|
doc_value=True,
|
|
token_list=self.DEFAULT_TOKEN_LIST,
|
|
chinese=True,
|
|
) # Maps to 'created_at' in PG
|
|
|
|
logger.info("Generated %d index keys for workflow_execution from WorkflowRun model", len(index_keys))
|
|
return index_keys
|
|
|
|
def _get_workflow_node_execution_index_keys(self) -> dict[str, IndexKeyConfig]:
|
|
"""
|
|
Get field index configuration for workflow_node_execution logstore.
|
|
|
|
This method automatically generates index configuration from the WorkflowNodeExecutionModel.
|
|
When the PG schema is updated via Flask-Migrate, the index configuration will be automatically
|
|
updated on next application startup.
|
|
"""
|
|
from models.workflow import WorkflowNodeExecutionModel
|
|
|
|
index_keys = self._generate_index_keys_from_model(WorkflowNodeExecutionModel)
|
|
|
|
logger.debug(
|
|
"Generated %d index keys for workflow_node_execution from WorkflowNodeExecutionModel", len(index_keys)
|
|
)
|
|
return index_keys
|
|
|
|
def _get_index_config(self, logstore_name: str) -> IndexConfig:
|
|
"""
|
|
Get index configuration for the specified logstore.
|
|
|
|
Args:
|
|
logstore_name: Name of the logstore
|
|
|
|
Returns:
|
|
IndexConfig object with line and field indexes
|
|
"""
|
|
# Create full-text index (line config) with tokenizer
|
|
line_config = IndexLineConfig(token_list=self.DEFAULT_TOKEN_LIST, case_sensitive=False, chinese=True)
|
|
|
|
# Get field index configuration based on logstore name
|
|
field_keys = {}
|
|
if logstore_name == AliyunLogStore.workflow_execution_logstore:
|
|
field_keys = self._get_workflow_execution_index_keys()
|
|
elif logstore_name == AliyunLogStore.workflow_node_execution_logstore:
|
|
field_keys = self._get_workflow_node_execution_index_keys()
|
|
|
|
# key_config_list should be a dict, not a list
|
|
# Create index config with both line and field indexes
|
|
return IndexConfig(line_config=line_config, key_config_list=field_keys, scan_index=True)
|
|
|
|
def create_index(self, logstore_name: str) -> None:
|
|
"""
|
|
Create index for the specified logstore with both full-text and field indexes.
|
|
Field indexes are automatically generated from the corresponding SQLAlchemy model.
|
|
"""
|
|
index_config = self._get_index_config(logstore_name)
|
|
|
|
try:
|
|
self.client.create_index(self.project_name, logstore_name, index_config)
|
|
logger.info(
|
|
"index for %s created successfully with %d field indexes",
|
|
logstore_name,
|
|
len(index_config.key_config_list or {}),
|
|
)
|
|
except LogException as e:
|
|
logger.exception(
|
|
"Failed to create index for logstore %s: errorCode=%s, errorMessage=%s, requestId=%s",
|
|
logstore_name,
|
|
e.get_error_code(),
|
|
e.get_error_message(),
|
|
e.get_request_id(),
|
|
)
|
|
raise
|
|
|
|
def _merge_index_configs(
|
|
self, existing_config: IndexConfig, required_keys: dict[str, IndexKeyConfig], logstore_name: str
|
|
) -> tuple[IndexConfig, bool]:
|
|
"""
|
|
Intelligently merge existing index config with Dify's required field indexes.
|
|
|
|
This method:
|
|
1. Preserves all existing field indexes in logstore (including custom fields)
|
|
2. Adds missing Dify-required fields
|
|
3. Updates fields where type doesn't match (with json/text compatibility)
|
|
4. Corrects case mismatches (e.g., if Dify needs 'status' but logstore has 'Status')
|
|
|
|
Type compatibility rules:
|
|
- json and text types are considered compatible (users can manually choose either)
|
|
- All other type mismatches will be corrected to match Dify requirements
|
|
|
|
Note: Logstore is case-sensitive and doesn't allow duplicate fields with different cases.
|
|
Case mismatch means: existing field name differs from required name only in case.
|
|
|
|
Args:
|
|
existing_config: Current index configuration from logstore
|
|
required_keys: Dify's required field index configurations
|
|
logstore_name: Name of the logstore (for logging)
|
|
|
|
Returns:
|
|
Tuple of (merged_config, needs_update)
|
|
"""
|
|
# key_config_list is already a dict in the SDK
|
|
# Make a copy to avoid modifying the original
|
|
existing_keys = dict(existing_config.key_config_list) if existing_config.key_config_list else {}
|
|
|
|
# Track changes
|
|
needs_update = False
|
|
case_corrections = [] # Fields that need case correction (e.g., 'Status' -> 'status')
|
|
missing_fields = []
|
|
type_mismatches = []
|
|
|
|
# First pass: Check for and resolve case mismatches with required fields
|
|
# Note: Logstore itself doesn't allow duplicate fields with different cases,
|
|
# so we only need to check if the existing case matches the required case
|
|
for required_name in required_keys:
|
|
lower_name = required_name.lower()
|
|
# Find key that matches case-insensitively but not exactly
|
|
wrong_case_key = None
|
|
for existing_key in existing_keys:
|
|
if existing_key.lower() == lower_name and existing_key != required_name:
|
|
wrong_case_key = existing_key
|
|
break
|
|
|
|
if wrong_case_key:
|
|
# Field exists but with wrong case (e.g., 'Status' when we need 'status')
|
|
# Remove the wrong-case key, will be added back with correct case later
|
|
case_corrections.append((wrong_case_key, required_name))
|
|
del existing_keys[wrong_case_key]
|
|
needs_update = True
|
|
|
|
# Second pass: Check each required field
|
|
for required_name, required_config in required_keys.items():
|
|
# Check for exact match (case-sensitive)
|
|
if required_name in existing_keys:
|
|
existing_type = existing_keys[required_name].index_type
|
|
required_type = required_config.index_type
|
|
|
|
# Check if type matches
|
|
# Special case: json and text are interchangeable for JSON content fields
|
|
# Allow users to manually configure text instead of json (or vice versa) without forcing updates
|
|
is_compatible = existing_type == required_type or ({existing_type, required_type} == {"json", "text"})
|
|
|
|
if not is_compatible:
|
|
type_mismatches.append((required_name, existing_type, required_type))
|
|
# Update with correct type
|
|
existing_keys[required_name] = required_config
|
|
needs_update = True
|
|
# else: field exists with compatible type, no action needed
|
|
else:
|
|
# Field doesn't exist (may have been removed in first pass due to case conflict)
|
|
missing_fields.append(required_name)
|
|
existing_keys[required_name] = required_config
|
|
needs_update = True
|
|
|
|
# Log changes
|
|
if missing_fields:
|
|
logger.info(
|
|
"Logstore %s: Adding %d missing Dify-required fields: %s",
|
|
logstore_name,
|
|
len(missing_fields),
|
|
", ".join(missing_fields[:10]) + ("..." if len(missing_fields) > 10 else ""),
|
|
)
|
|
|
|
if type_mismatches:
|
|
logger.info(
|
|
"Logstore %s: Fixing %d type mismatches: %s",
|
|
logstore_name,
|
|
len(type_mismatches),
|
|
", ".join([f"{name}({old}->{new})" for name, old, new in type_mismatches[:5]])
|
|
+ ("..." if len(type_mismatches) > 5 else ""),
|
|
)
|
|
|
|
if case_corrections:
|
|
logger.info(
|
|
"Logstore %s: Correcting %d field name cases: %s",
|
|
logstore_name,
|
|
len(case_corrections),
|
|
", ".join([f"'{old}' -> '{new}'" for old, new in case_corrections[:5]])
|
|
+ ("..." if len(case_corrections) > 5 else ""),
|
|
)
|
|
|
|
# Create merged config
|
|
# key_config_list should be a dict, not a list
|
|
# Preserve the original scan_index value - don't force it to True
|
|
merged_config = IndexConfig(
|
|
line_config=existing_config.line_config
|
|
or IndexLineConfig(token_list=self.DEFAULT_TOKEN_LIST, case_sensitive=False, chinese=True),
|
|
key_config_list=existing_keys,
|
|
scan_index=existing_config.scan_index,
|
|
)
|
|
|
|
return merged_config, needs_update
|
|
|
|
def ensure_index_config(self, logstore_name: str) -> None:
|
|
"""
|
|
Ensure index configuration includes all Dify-required fields.
|
|
|
|
This method intelligently manages index configuration:
|
|
1. If index doesn't exist, create it with Dify's required fields
|
|
2. If index exists:
|
|
- Check if all Dify-required fields are present
|
|
- Check if field types match requirements
|
|
- Only update if fields are missing or types are incorrect
|
|
- Preserve any additional custom index configurations
|
|
|
|
This approach allows users to add their own custom indexes without being overwritten.
|
|
"""
|
|
# Get Dify's required field indexes
|
|
required_keys = {}
|
|
if logstore_name == AliyunLogStore.workflow_execution_logstore:
|
|
required_keys = self._get_workflow_execution_index_keys()
|
|
elif logstore_name == AliyunLogStore.workflow_node_execution_logstore:
|
|
required_keys = self._get_workflow_node_execution_index_keys()
|
|
|
|
# Check if index exists
|
|
existing_config = self.get_existing_index_config(logstore_name)
|
|
|
|
if existing_config is None:
|
|
# Index doesn't exist, create it
|
|
logger.info(
|
|
"Logstore %s: Index doesn't exist, creating with %d required fields",
|
|
logstore_name,
|
|
len(required_keys),
|
|
)
|
|
self.create_index(logstore_name)
|
|
else:
|
|
merged_config, needs_update = self._merge_index_configs(existing_config, required_keys, logstore_name)
|
|
|
|
if needs_update:
|
|
logger.info("Logstore %s: Updating index to include Dify-required fields", logstore_name)
|
|
try:
|
|
self.client.update_index(self.project_name, logstore_name, merged_config)
|
|
logger.info(
|
|
"Logstore %s: Index updated successfully, now has %d total field indexes",
|
|
logstore_name,
|
|
len(merged_config.key_config_list or {}),
|
|
)
|
|
except LogException as e:
|
|
logger.exception(
|
|
"Failed to update index for logstore %s: errorCode=%s, errorMessage=%s, requestId=%s",
|
|
logstore_name,
|
|
e.get_error_code(),
|
|
e.get_error_message(),
|
|
e.get_request_id(),
|
|
)
|
|
raise
|
|
else:
|
|
logger.info(
|
|
"Logstore %s: Index already contains all %d Dify-required fields with correct types, "
|
|
"no update needed",
|
|
logstore_name,
|
|
len(required_keys),
|
|
)
|
|
|
|
def put_log(self, logstore: str, contents: Sequence[tuple[str, str]]) -> None:
|
|
# Route to PG or SDK based on protocol availability
|
|
if self._use_pg_protocol and self._pg_client:
|
|
self._pg_client.put_log(logstore, contents, self.log_enabled)
|
|
else:
|
|
log_item = LogItem(contents=contents)
|
|
request = PutLogsRequest(project=self.project_name, logstore=logstore, logitems=[log_item])
|
|
|
|
if self.log_enabled:
|
|
logger.info(
|
|
"[LogStore-SDK] PUT_LOG | logstore=%s | project=%s | items_count=%d",
|
|
logstore,
|
|
self.project_name,
|
|
len(contents),
|
|
)
|
|
|
|
try:
|
|
self.client.put_logs(request)
|
|
except LogException as e:
|
|
logger.exception(
|
|
"Failed to put logs to logstore %s: errorCode=%s, errorMessage=%s, requestId=%s",
|
|
logstore,
|
|
e.get_error_code(),
|
|
e.get_error_message(),
|
|
e.get_request_id(),
|
|
)
|
|
raise
|
|
|
|
def get_logs(
|
|
self,
|
|
logstore: str,
|
|
from_time: int,
|
|
to_time: int,
|
|
topic: str = "",
|
|
query: str = "",
|
|
line: int = 100,
|
|
offset: int = 0,
|
|
reverse: bool = True,
|
|
) -> list[dict]:
|
|
request = GetLogsRequest(
|
|
project=self.project_name,
|
|
logstore=logstore,
|
|
fromTime=from_time,
|
|
toTime=to_time,
|
|
topic=topic,
|
|
query=query,
|
|
line=line,
|
|
offset=offset,
|
|
reverse=reverse,
|
|
)
|
|
|
|
# Log query info if SQLALCHEMY_ECHO is enabled
|
|
if self.log_enabled:
|
|
logger.info(
|
|
"[LogStore] GET_LOGS | logstore=%s | project=%s | query=%s | "
|
|
"from_time=%d | to_time=%d | line=%d | offset=%d | reverse=%s",
|
|
logstore,
|
|
self.project_name,
|
|
query,
|
|
from_time,
|
|
to_time,
|
|
line,
|
|
offset,
|
|
reverse,
|
|
)
|
|
|
|
try:
|
|
response = self.client.get_logs(request)
|
|
result = []
|
|
logs = response.get_logs() if response else []
|
|
for log in logs:
|
|
result.append(log.get_contents())
|
|
|
|
# Log result count if SQLALCHEMY_ECHO is enabled
|
|
if self.log_enabled:
|
|
logger.info(
|
|
"[LogStore] GET_LOGS RESULT | logstore=%s | returned_count=%d",
|
|
logstore,
|
|
len(result),
|
|
)
|
|
|
|
return result
|
|
except LogException as e:
|
|
logger.exception(
|
|
"Failed to get logs from logstore %s with query '%s': errorCode=%s, errorMessage=%s, requestId=%s",
|
|
logstore,
|
|
query,
|
|
e.get_error_code(),
|
|
e.get_error_message(),
|
|
e.get_request_id(),
|
|
)
|
|
raise
|
|
|
|
def execute_sql(
|
|
self,
|
|
sql: str,
|
|
logstore: str | None = None,
|
|
query: str = "*",
|
|
from_time: int | None = None,
|
|
to_time: int | None = None,
|
|
power_sql: bool = False,
|
|
) -> list[dict]:
|
|
"""
|
|
Execute SQL query for aggregation and analysis.
|
|
|
|
Args:
|
|
sql: SQL query string (SELECT statement)
|
|
logstore: Name of the logstore (required)
|
|
query: Search/filter query for SDK mode (default: "*" for all logs).
|
|
Only used in SDK mode. PG mode ignores this parameter.
|
|
from_time: Start time (Unix timestamp) - only used in SDK mode
|
|
to_time: End time (Unix timestamp) - only used in SDK mode
|
|
power_sql: Whether to use enhanced SQL mode (default: False)
|
|
|
|
Returns:
|
|
List of result rows as dictionaries
|
|
|
|
Note:
|
|
- PG mode: Only executes the SQL directly
|
|
- SDK mode: Combines query and sql as "query | sql"
|
|
"""
|
|
# Logstore is required
|
|
if not logstore:
|
|
raise ValueError("logstore parameter is required for execute_sql")
|
|
|
|
# Route to PG or SDK based on protocol availability
|
|
if self._use_pg_protocol and self._pg_client:
|
|
# PG mode: execute SQL directly (ignore query parameter)
|
|
return self._pg_client.execute_sql(sql, logstore, self.log_enabled)
|
|
else:
|
|
# SDK mode: combine query and sql as "query | sql"
|
|
full_query = f"{query} | {sql}"
|
|
|
|
# Provide default time range if not specified
|
|
if from_time is None:
|
|
from_time = 0
|
|
|
|
if to_time is None:
|
|
to_time = int(time.time()) # now
|
|
|
|
request = GetLogsRequest(
|
|
project=self.project_name,
|
|
logstore=logstore,
|
|
fromTime=from_time,
|
|
toTime=to_time,
|
|
query=full_query,
|
|
)
|
|
|
|
# Log query info if SQLALCHEMY_ECHO is enabled
|
|
if self.log_enabled:
|
|
logger.info(
|
|
"[LogStore-SDK] EXECUTE_SQL | logstore=%s | project=%s | from_time=%d | to_time=%d | full_query=%s",
|
|
logstore,
|
|
self.project_name,
|
|
from_time,
|
|
to_time,
|
|
query,
|
|
sql,
|
|
)
|
|
|
|
try:
|
|
response = self.client.get_logs(request)
|
|
|
|
result = []
|
|
logs = response.get_logs() if response else []
|
|
for log in logs:
|
|
result.append(log.get_contents())
|
|
|
|
# Log result count if SQLALCHEMY_ECHO is enabled
|
|
if self.log_enabled:
|
|
logger.info(
|
|
"[LogStore-SDK] EXECUTE_SQL RESULT | logstore=%s | returned_count=%d",
|
|
logstore,
|
|
len(result),
|
|
)
|
|
|
|
return result
|
|
except LogException as e:
|
|
logger.exception(
|
|
"Failed to execute SQL, logstore %s: errorCode=%s, errorMessage=%s, requestId=%s, full_query=%s",
|
|
logstore,
|
|
e.get_error_code(),
|
|
e.get_error_message(),
|
|
e.get_request_id(),
|
|
full_query,
|
|
)
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
aliyun_logstore = AliyunLogStore()
|
|
# aliyun_logstore.init_project_logstore()
|
|
aliyun_logstore.put_log(AliyunLogStore.workflow_execution_logstore, [("key1", "value1")])
|