Files
wangliang e093995368 feat: 增强 Agent 系统和完善项目结构
主要改进:
- Agent 增强: 订单查询、售后支持、客服路由等功能优化
- 新增语言检测和 Token 管理模块
- 改进 Chatwoot webhook 处理和用户标识
- MCP 服务器增强: 订单 MCP 和 Strapi MCP 功能扩展
- 新增商城客户端、知识库、缓存和同步模块
- 添加多语言提示词系统 (YAML)
- 完善项目结构: 整理文档、脚本和测试文件
- 新增调试和测试工具脚本

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-16 16:28:47 +08:00

162 lines
4.5 KiB
Python

"""
Redis Cache for Strapi MCP Server
"""
import json
import hashlib
from typing import Any, Optional, Callable
from redis import asyncio as aioredis
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class CacheSettings(BaseSettings):
"""Cache configuration"""
redis_host: str = "localhost"
redis_port: int = 6379
redis_password: Optional[str] = None
redis_db: int = 1 # 使用不同的 DB 避免 key 冲突
cache_ttl: int = 3600 # 默认缓存 1 小时
model_config = ConfigDict(env_file=".env")
cache_settings = CacheSettings()
class StrapiCache:
"""Redis cache wrapper for Strapi responses"""
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
password: Optional[str] = None,
db: Optional[int] = None,
ttl: Optional[int] = None
):
self.host = host or cache_settings.redis_host
self.port = port or cache_settings.redis_port
self.password = password or cache_settings.redis_password
self.db = db or cache_settings.redis_db
self.ttl = ttl or cache_settings.cache_ttl
self._redis: Optional[aioredis.Redis] = None
async def _get_redis(self) -> aioredis.Redis:
"""Get or create Redis connection"""
if self._redis is None:
self._redis = aioredis.from_url(
f"redis://{':' + self.password if self.password else ''}@{self.host}:{self.port}/{self.db}",
encoding="utf-8",
decode_responses=True
)
return self._redis
def _generate_key(self, category: str, locale: str, **kwargs) -> str:
"""Generate cache key from parameters"""
# 创建唯一 key
key_parts = [category, locale]
for k, v in sorted(kwargs.items()):
key_parts.append(f"{k}:{v}")
key_string = ":".join(key_parts)
# 使用 MD5 hash 缩短 key 长度
return f"strapi:{hashlib.md5(key_string.encode()).hexdigest()}"
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
redis = await self._get_redis()
value = await redis.get(key)
if value:
return json.loads(value)
except Exception:
# Redis 不可用时降级,不影响业务
pass
return None
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set value in cache"""
try:
redis = await self._get_redis()
ttl = ttl or self.ttl
await redis.setex(key, ttl, json.dumps(value, ensure_ascii=False))
return True
except Exception:
# Redis 不可用时降级
return False
async def delete(self, key: str) -> bool:
"""Delete value from cache"""
try:
redis = await self._get_redis()
await redis.delete(key)
return True
except Exception:
return False
async def clear_pattern(self, pattern: str) -> int:
"""Clear all keys matching pattern"""
try:
redis = await self._get_redis()
keys = await redis.keys(f"{pattern}*")
if keys:
await redis.delete(*keys)
return len(keys)
except Exception:
return 0
async def close(self):
"""Close Redis connection"""
if self._redis:
await self._redis.close()
self._redis = None
# 全局缓存实例
cache = StrapiCache()
async def cached_query(
cache_key: str,
query_func: Callable,
ttl: Optional[int] = None
) -> Any:
"""Execute cached query
Args:
cache_key: Cache key
query_func: Async function to fetch data
ttl: Cache TTL in seconds (overrides default)
Returns:
Cached or fresh data
"""
# Try to get from cache
cached_value = await cache.get(cache_key)
if cached_value is not None:
return cached_value
# Cache miss, execute query
result = await query_func()
# Store in cache
if result is not None:
await cache.set(cache_key, result, ttl)
return result
async def clear_strapi_cache(pattern: Optional[str] = None) -> int:
"""Clear Strapi cache
Args:
pattern: Key pattern to clear (default: all strapi keys)
Returns:
Number of keys deleted
"""
if pattern:
return await cache.clear_pattern(f"strapi:{pattern}")
else:
return await cache.clear_pattern("strapi:")