Files
assistant/mcp_servers/strapi_mcp/http_routes.py
wangliang e093995368 feat: 增强 Agent 系统和完善项目结构
主要改进:
- Agent 增强: 订单查询、售后支持、客服路由等功能优化
- 新增语言检测和 Token 管理模块
- 改进 Chatwoot webhook 处理和用户标识
- MCP 服务器增强: 订单 MCP 和 Strapi MCP 功能扩展
- 新增商城客户端、知识库、缓存和同步模块
- 添加多语言提示词系统 (YAML)
- 完善项目结构: 整理文档、脚本和测试文件
- 新增调试和测试工具脚本

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-16 16:28:47 +08:00

497 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HTTP Routes for Strapi MCP Server
Provides direct HTTP access to knowledge base functions (with local cache)
"""
from typing import Optional, List
import httpx
from fastapi import Request
from starlette.responses import JSONResponse
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
from config_loader import load_config, get_category_endpoint
from knowledge_base import get_kb
class Settings(BaseSettings):
"""Server configuration"""
strapi_api_url: str
strapi_api_token: str = ""
log_level: str = "INFO"
sync_on_startup: bool = True # Run initial sync on startup
sync_interval_minutes: int = 60 # Sync interval in minutes
model_config = ConfigDict(env_file=".env")
settings = Settings()
# 加载配置文件
try:
strapi_config = load_config()
# 使用配置文件中的 URL如果存在
if strapi_config.base_url:
settings.strapi_api_url = strapi_config.base_url
except Exception:
# 如果配置文件加载失败,使用环境变量
strapi_config = None
# ============ Company Info ============
async def get_company_info_http(section: str = "contact", locale: str = "en"):
"""Get company information - HTTP wrapper
Args:
section: Section identifier (e.g., "contact")
locale: Language locale (default: en)
Supported: en, nl, de, es, fr, it, tr
"""
# Try local knowledge base first
kb = get_kb()
try:
local_result = kb.get_company_info(section, locale)
if local_result["success"]:
return local_result
except Exception as e:
print(f"Local KB error: {e}")
# Fallback to Strapi API
try:
# Map section names to API endpoints
section_map = {
"contact": "info-contact",
"about": "info-about",
"service": "info-service",
}
endpoint = section_map.get(section, f"info-{section}")
# Build query parameters
headers = {"Content-Type": "application/json"}
if settings.strapi_api_token and settings.strapi_api_token.strip():
headers["Authorization"] = f"Bearer {settings.strapi_api_token}"
# Request with populate=deep to get all related data
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{settings.strapi_api_url}/api/{endpoint}",
params={"populate": "deep", "locale": locale},
headers=headers
)
response.raise_for_status()
data = response.json()
if not data.get("data"):
return {
"success": False,
"error": f"Section '{section}' not found",
"data": None
}
item = data["data"]
# Extract relevant information
result_data = {
"id": item.get("id"),
"title": item.get("title"),
"description": item.get("description"),
"section": section,
"locale": locale
}
# Add profile information if available
if item.get("yehwang_profile"):
profile = item["yehwang_profile"]
result_data["profile"] = {
"title": profile.get("title"),
"content": profile.get("content")
}
# Save to local cache for next time
try:
kb.save_company_info(section, locale, result_data)
except Exception as e:
print(f"Failed to save to local cache: {e}")
return {
"success": True,
"data": result_data
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": None
}
# ============ FAQ Query ============
async def query_faq_http(
category: str = "other",
locale: str = "en",
limit: int = 10
):
"""Get FAQ by category - HTTP wrapper (with local cache fallback)
Args:
category: FAQ category (register, order, pre-order, payment, shipment, return, other)
locale: Language locale (default: en)
limit: Maximum results to return
"""
# Try local knowledge base first
kb = get_kb()
try:
local_result = kb.query_faq(category, locale, limit)
if local_result["count"] > 0:
return local_result
except Exception as e:
print(f"Local KB error: {e}")
# Fallback to Strapi API (if local cache is empty)
try:
# 从配置文件获取端点
if strapi_config:
endpoint = get_category_endpoint(category, strapi_config)
else:
# 回退到硬编码的默认值
endpoint = f"faq-{category}"
headers = {"Content-Type": "application/json"}
if settings.strapi_api_token and settings.strapi_api_token.strip():
headers["Authorization"] = f"Bearer {settings.strapi_api_token}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{settings.strapi_api_url}/api/{endpoint}",
params={"populate": "deep", "locale": locale},
headers=headers
)
response.raise_for_status()
data = response.json()
# Check if data exists
if not data.get("data"):
return {
"success": True,
"count": 0,
"category": category,
"locale": locale,
"results": [],
"_source": "strapi_api"
}
# Handle different response formats
item_data = data["data"]
# If it's a single object with nested items
faq_list = []
if isinstance(item_data, dict):
# Check for content array (Yehwang format)
if item_data.get("content"):
faq_list = item_data["content"]
# Check for nested FAQ array
elif item_data.get("faqs"):
faq_list = item_data["faqs"]
# Check for questions array
elif item_data.get("questions"):
faq_list = item_data["questions"]
# The item itself might be the FAQ
elif item_data.get("question") and item_data.get("answer"):
faq_list = [item_data]
else:
# Return the whole data as one FAQ
faq_list = [item_data]
elif isinstance(item_data, list):
faq_list = item_data
# Format results and save to local cache
results = []
for item in faq_list[:limit]:
faq_item = {
"id": item.get("id"),
"category": category,
"locale": locale
}
# Yehwang format: title and content
if item.get("title"):
faq_item["question"] = item.get("title")
if item.get("content"):
faq_item["answer"] = item.get("content")
# Also support other field names
if item.get("question"):
faq_item["question"] = item.get("question")
if item.get("answer"):
faq_item["answer"] = item.get("answer")
if item.get("description"):
faq_item["description"] = item.get("description")
# Add any other fields
for key, value in item.items():
if key not in faq_item and key not in ["id", "createdAt", "updatedAt", "publishedAt", "locale"]:
faq_item[key] = value
if "question" in faq_item or "answer" in faq_item:
results.append(faq_item)
# Save to local cache for next time
try:
kb.save_faq_batch(faq_list, category, locale)
except Exception as e:
print(f"Failed to save to local cache: {e}")
return {
"success": True,
"count": len(results),
"category": category,
"locale": locale,
"results": results,
"_source": "strapi_api"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"category": category,
"results": [],
"_source": "error"
}
async def search_faq_http(
query: str,
locale: str = "en",
limit: int = 5
):
"""Search FAQ across all categories - HTTP wrapper
Args:
query: Search keywords
locale: Language locale (default: en)
limit: Maximum results per category
"""
try:
all_results = []
# 获取所有分类
if strapi_config:
categories = strapi_config.faq_categories
else:
# 回退到默认分类
categories = {
"register": type("obj", (object,), {"endpoint": "faq-register"}),
"order": type("obj", (object,), {"endpoint": "faq-order"}),
"pre-order": type("obj", (object,), {"endpoint": "faq-pre-order"}),
"payment": type("obj", (object,), {"endpoint": "faq-payment"}),
"shipment": type("obj", (object,), {"endpoint": "faq-shipment"}),
"return": type("obj", (object,), {"endpoint": "faq-return"}),
"other": type("obj", (object,), {"endpoint": "faq-other-question"}),
}
# Search all categories in parallel
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = []
for category_name, category_config in categories.items():
endpoint = category_config.endpoint if hasattr(category_config, "endpoint") else f"faq-{category_name}"
headers = {"Content-Type": "application/json"}
if settings.strapi_api_token and settings.strapi_api_token.strip():
headers["Authorization"] = f"Bearer {settings.strapi_api_token}"
task = client.get(
f"{settings.strapi_api_url}/api/{endpoint}",
params={"populate": "deep", "locale": locale},
headers=headers
)
tasks.append((category_name, task))
# Execute all requests
for category_name, task in tasks:
try:
response = await task
if response.status_code == 200:
data = response.json()
if data.get("data"):
item_data = data["data"]
# Extract FAQ list
faq_list = []
if isinstance(item_data, dict):
# Yehwang format: content array
if item_data.get("content"):
faq_list = item_data["content"]
elif item_data.get("faqs"):
faq_list = item_data["faqs"]
elif item_data.get("questions"):
faq_list = item_data["questions"]
elif item_data.get("question") and item_data.get("answer"):
faq_list = [item_data]
elif isinstance(item_data, list):
faq_list = item_data
# Filter by query and add to results
for item in faq_list:
# Search in title/content (Yehwang format) or question/answer
item_text = (
str(item.get("title", "")) +
str(item.get("content", "")) +
str(item.get("question", "")) +
str(item.get("answer", "")) +
str(item.get("description", ""))
)
if query.lower() in item_text.lower():
result_item = {
"id": item.get("id"),
"category": category_name,
"locale": locale
}
# Use title as question (Yehwang format)
if item.get("title"):
result_item["question"] = item.get("title")
if item.get("content"):
result_item["answer"] = item.get("content")
# Also support other field names
if item.get("question"):
result_item["question"] = item.get("question")
if item.get("answer"):
result_item["answer"] = item.get("answer")
if item.get("description"):
result_item["description"] = item.get("description")
all_results.append(result_item)
except Exception as e:
# Continue with next category on error
pass
return {
"success": True,
"count": len(all_results),
"query": query,
"locale": locale,
"results": all_results[:limit]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"results": []
}
async def search_knowledge_base_http(query: str, locale: str = "en", limit: int = 10):
"""Search knowledge base - HTTP wrapper
Args:
query: Search keywords
locale: Language locale
limit: Maximum results
"""
# Try local knowledge base first using FTS
kb = get_kb()
try:
local_result = kb.search_faq(query, locale, limit)
if local_result["count"] > 0:
return local_result
except Exception as e:
print(f"Local KB search error: {e}")
# Fallback to searching FAQ across all categories via Strapi API
return await search_faq_http(query, locale, limit)
async def get_policy_http(policy_type: str, locale: str = "en"):
"""Get policy document - HTTP wrapper
Args:
policy_type: Type of policy (return_policy, privacy_policy, etc.)
locale: Language locale
"""
# Try local knowledge base first
kb = get_kb()
try:
local_result = kb.get_policy(policy_type, locale)
if local_result["success"]:
return local_result
except Exception as e:
print(f"Local KB error: {e}")
# Fallback to Strapi API
try:
# Map policy types to endpoints
policy_map = {
"return_policy": "policy-return",
"privacy_policy": "policy-privacy",
"terms_of_service": "policy-terms",
"shipping_policy": "policy-shipping",
"payment_policy": "policy-payment",
}
endpoint = policy_map.get(policy_type, f"policy-{policy_type}")
headers = {"Content-Type": "application/json"}
if settings.strapi_api_token and settings.strapi_api_token.strip():
headers["Authorization"] = f"Bearer {settings.strapi_api_token}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{settings.strapi_api_url}/api/{endpoint}",
params={"populate": "deep", "locale": locale},
headers=headers
)
response.raise_for_status()
data = response.json()
if not data.get("data"):
return {
"success": False,
"error": f"Policy '{policy_type}' not found",
"data": None
}
item = data["data"]
policy_data = {
"title": item.get("title"),
"summary": item.get("summary"),
"content": item.get("content"),
"version": item.get("version"),
"effective_date": item.get("effective_date")
}
# Save to local cache for next time
try:
kb.save_policy(policy_type, locale, policy_data)
except Exception as e:
print(f"Failed to save to local cache: {e}")
return {
"success": True,
"data": {
"id": item.get("id"),
"type": policy_type,
"title": item.get("title"),
"content": item.get("content"),
"summary": item.get("summary"),
"locale": locale
}
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": None
}