Files
assistant/mcp_servers/shared/strapi_client.py
wangliang e093995368 feat: 增强 Agent 系统和完善项目结构
主要改进:
- Agent 增强: 订单查询、售后支持、客服路由等功能优化
- 新增语言检测和 Token 管理模块
- 改进 Chatwoot webhook 处理和用户标识
- MCP 服务器增强: 订单 MCP 和 Strapi MCP 功能扩展
- 新增商城客户端、知识库、缓存和同步模块
- 添加多语言提示词系统 (YAML)
- 完善项目结构: 整理文档、脚本和测试文件
- 新增调试和测试工具脚本

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-16 16:28:47 +08:00

131 lines
3.9 KiB
Python

"""
Strapi API Client for MCP Server
"""
from typing import Any, Optional
import httpx
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class StrapiSettings(BaseSettings):
"""Strapi configuration"""
strapi_api_url: str
strapi_api_token: str
sync_on_startup: bool = True # Run initial sync on startup
sync_interval_minutes: int = 60 # Sync interval in minutes
model_config = ConfigDict(env_file=".env")
settings = StrapiSettings()
class StrapiClient:
"""Async client for Strapi CMS API"""
def __init__(
self,
api_url: Optional[str] = None,
api_token: Optional[str] = None
):
self.api_url = (api_url or settings.strapi_api_url).rstrip("/")
self.api_token = api_token or settings.strapi_api_token
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
headers = {"Content-Type": "application/json"}
# Only add Authorization header if token is provided
if self.api_token and self.api_token.strip():
headers["Authorization"] = f"Bearer {self.api_token}"
self._client = httpx.AsyncClient(
base_url=f"{self.api_url}/api",
headers=headers,
timeout=30.0
)
return self._client
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
async def get(
self,
endpoint: str,
params: Optional[dict[str, Any]] = None
) -> dict[str, Any]:
"""GET request to Strapi API"""
client = await self._get_client()
response = await client.get(endpoint, params=params)
response.raise_for_status()
return response.json()
async def query_collection(
self,
collection: str,
filters: Optional[dict[str, Any]] = None,
sort: Optional[list[str]] = None,
pagination: Optional[dict[str, int]] = None,
locale: str = "zh-CN"
) -> dict[str, Any]:
"""Query a Strapi collection with filters
Args:
collection: Collection name (e.g., 'faqs', 'company-infos')
filters: Strapi filter object
sort: Sort fields (e.g., ['priority:desc'])
pagination: Pagination params {page, pageSize} or {limit}
locale: Locale for i18n content
"""
params = {"locale": locale}
# Add filters
if filters:
for key, value in filters.items():
params[f"filters{key}"] = value
# Add sort
if sort:
for i, s in enumerate(sort):
params[f"sort[{i}]"] = s
# Add pagination
if pagination:
for key, value in pagination.items():
params[f"pagination[{key}]"] = value
return await self.get(f"/{collection}", params=params)
@staticmethod
def flatten_response(data: dict[str, Any]) -> list[dict[str, Any]]:
"""Flatten Strapi response structure
Converts Strapi's {data: [{id, attributes: {...}}]} format
to simple [{id, ...attributes}] format.
"""
items = data.get("data", [])
result = []
for item in items:
flattened = {"id": item.get("id")}
attributes = item.get("attributes", {})
flattened.update(attributes)
result.append(flattened)
return result
@staticmethod
def flatten_single(data: dict[str, Any]) -> Optional[dict[str, Any]]:
"""Flatten a single Strapi item response"""
item = data.get("data")
if not item:
return None
return {
"id": item.get("id"),
**item.get("attributes", {})
}