Files

497 lines
17 KiB
Python
Raw Permalink Normal View History

"""
HTTP Routes for Strapi MCP Server
Provides direct HTTP access to knowledge base functions (with local cache)
"""
from typing import Optional, List
import httpx
from fastapi import Request
from starlette.responses import JSONResponse
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
from config_loader import load_config, get_category_endpoint
from knowledge_base import get_kb
class Settings(BaseSettings):
"""Server configuration"""
strapi_api_url: str
strapi_api_token: str = ""
log_level: str = "INFO"
sync_on_startup: bool = True # Run initial sync on startup
sync_interval_minutes: int = 60 # Sync interval in minutes
model_config = ConfigDict(env_file=".env")
settings = Settings()
# 加载配置文件
try:
strapi_config = load_config()
# 使用配置文件中的 URL如果存在
if strapi_config.base_url:
settings.strapi_api_url = strapi_config.base_url
except Exception:
# 如果配置文件加载失败,使用环境变量
strapi_config = None
# ============ Company Info ============
async def get_company_info_http(section: str = "contact", locale: str = "en"):
"""Get company information - HTTP wrapper
Args:
section: Section identifier (e.g., "contact")
locale: Language locale (default: en)
Supported: en, nl, de, es, fr, it, tr
"""
# Try local knowledge base first
kb = get_kb()
try:
local_result = kb.get_company_info(section, locale)
if local_result["success"]:
return local_result
except Exception as e:
print(f"Local KB error: {e}")
# Fallback to Strapi API
try:
# Map section names to API endpoints
section_map = {
"contact": "info-contact",
"about": "info-about",
"service": "info-service",
}
endpoint = section_map.get(section, f"info-{section}")
# Build query parameters
headers = {"Content-Type": "application/json"}
if settings.strapi_api_token and settings.strapi_api_token.strip():
headers["Authorization"] = f"Bearer {settings.strapi_api_token}"
# Request with populate=deep to get all related data
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{settings.strapi_api_url}/api/{endpoint}",
params={"populate": "deep", "locale": locale},
headers=headers
)
response.raise_for_status()
data = response.json()
if not data.get("data"):
return {
"success": False,
"error": f"Section '{section}' not found",
"data": None
}
item = data["data"]
# Extract relevant information
result_data = {
"id": item.get("id"),
"title": item.get("title"),
"description": item.get("description"),
"section": section,
"locale": locale
}
# Add profile information if available
if item.get("yehwang_profile"):
profile = item["yehwang_profile"]
result_data["profile"] = {
"title": profile.get("title"),
"content": profile.get("content")
}
# Save to local cache for next time
try:
kb.save_company_info(section, locale, result_data)
except Exception as e:
print(f"Failed to save to local cache: {e}")
return {
"success": True,
"data": result_data
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": None
}
# ============ FAQ Query ============
async def query_faq_http(
category: str = "other",
locale: str = "en",
limit: int = 10
):
"""Get FAQ by category - HTTP wrapper (with local cache fallback)
Args:
category: FAQ category (register, order, pre-order, payment, shipment, return, other)
locale: Language locale (default: en)
limit: Maximum results to return
"""
# Try local knowledge base first
kb = get_kb()
try:
local_result = kb.query_faq(category, locale, limit)
if local_result["count"] > 0:
return local_result
except Exception as e:
print(f"Local KB error: {e}")
# Fallback to Strapi API (if local cache is empty)
try:
# 从配置文件获取端点
if strapi_config:
endpoint = get_category_endpoint(category, strapi_config)
else:
# 回退到硬编码的默认值
endpoint = f"faq-{category}"
headers = {"Content-Type": "application/json"}
if settings.strapi_api_token and settings.strapi_api_token.strip():
headers["Authorization"] = f"Bearer {settings.strapi_api_token}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{settings.strapi_api_url}/api/{endpoint}",
params={"populate": "deep", "locale": locale},
headers=headers
)
response.raise_for_status()
data = response.json()
# Check if data exists
if not data.get("data"):
return {
"success": True,
"count": 0,
"category": category,
"locale": locale,
"results": [],
"_source": "strapi_api"
}
# Handle different response formats
item_data = data["data"]
# If it's a single object with nested items
faq_list = []
if isinstance(item_data, dict):
# Check for content array (Yehwang format)
if item_data.get("content"):
faq_list = item_data["content"]
# Check for nested FAQ array
elif item_data.get("faqs"):
faq_list = item_data["faqs"]
# Check for questions array
elif item_data.get("questions"):
faq_list = item_data["questions"]
# The item itself might be the FAQ
elif item_data.get("question") and item_data.get("answer"):
faq_list = [item_data]
else:
# Return the whole data as one FAQ
faq_list = [item_data]
elif isinstance(item_data, list):
faq_list = item_data
# Format results and save to local cache
results = []
for item in faq_list[:limit]:
faq_item = {
"id": item.get("id"),
"category": category,
"locale": locale
}
# Yehwang format: title and content
if item.get("title"):
faq_item["question"] = item.get("title")
if item.get("content"):
faq_item["answer"] = item.get("content")
# Also support other field names
if item.get("question"):
faq_item["question"] = item.get("question")
if item.get("answer"):
faq_item["answer"] = item.get("answer")
if item.get("description"):
faq_item["description"] = item.get("description")
# Add any other fields
for key, value in item.items():
if key not in faq_item and key not in ["id", "createdAt", "updatedAt", "publishedAt", "locale"]:
faq_item[key] = value
if "question" in faq_item or "answer" in faq_item:
results.append(faq_item)
# Save to local cache for next time
try:
kb.save_faq_batch(faq_list, category, locale)
except Exception as e:
print(f"Failed to save to local cache: {e}")
return {
"success": True,
"count": len(results),
"category": category,
"locale": locale,
"results": results,
"_source": "strapi_api"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"category": category,
"results": [],
"_source": "error"
}
async def search_faq_http(
query: str,
locale: str = "en",
limit: int = 5
):
"""Search FAQ across all categories - HTTP wrapper
Args:
query: Search keywords
locale: Language locale (default: en)
limit: Maximum results per category
"""
try:
all_results = []
# 获取所有分类
if strapi_config:
categories = strapi_config.faq_categories
else:
# 回退到默认分类
categories = {
"register": type("obj", (object,), {"endpoint": "faq-register"}),
"order": type("obj", (object,), {"endpoint": "faq-order"}),
"pre-order": type("obj", (object,), {"endpoint": "faq-pre-order"}),
"payment": type("obj", (object,), {"endpoint": "faq-payment"}),
"shipment": type("obj", (object,), {"endpoint": "faq-shipment"}),
"return": type("obj", (object,), {"endpoint": "faq-return"}),
"other": type("obj", (object,), {"endpoint": "faq-other-question"}),
}
# Search all categories in parallel
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = []
for category_name, category_config in categories.items():
endpoint = category_config.endpoint if hasattr(category_config, "endpoint") else f"faq-{category_name}"
headers = {"Content-Type": "application/json"}
if settings.strapi_api_token and settings.strapi_api_token.strip():
headers["Authorization"] = f"Bearer {settings.strapi_api_token}"
task = client.get(
f"{settings.strapi_api_url}/api/{endpoint}",
params={"populate": "deep", "locale": locale},
headers=headers
)
tasks.append((category_name, task))
# Execute all requests
for category_name, task in tasks:
try:
response = await task
if response.status_code == 200:
data = response.json()
if data.get("data"):
item_data = data["data"]
# Extract FAQ list
faq_list = []
if isinstance(item_data, dict):
# Yehwang format: content array
if item_data.get("content"):
faq_list = item_data["content"]
elif item_data.get("faqs"):
faq_list = item_data["faqs"]
elif item_data.get("questions"):
faq_list = item_data["questions"]
elif item_data.get("question") and item_data.get("answer"):
faq_list = [item_data]
elif isinstance(item_data, list):
faq_list = item_data
# Filter by query and add to results
for item in faq_list:
# Search in title/content (Yehwang format) or question/answer
item_text = (
str(item.get("title", "")) +
str(item.get("content", "")) +
str(item.get("question", "")) +
str(item.get("answer", "")) +
str(item.get("description", ""))
)
if query.lower() in item_text.lower():
result_item = {
"id": item.get("id"),
"category": category_name,
"locale": locale
}
# Use title as question (Yehwang format)
if item.get("title"):
result_item["question"] = item.get("title")
if item.get("content"):
result_item["answer"] = item.get("content")
# Also support other field names
if item.get("question"):
result_item["question"] = item.get("question")
if item.get("answer"):
result_item["answer"] = item.get("answer")
if item.get("description"):
result_item["description"] = item.get("description")
all_results.append(result_item)
except Exception as e:
# Continue with next category on error
pass
return {
"success": True,
"count": len(all_results),
"query": query,
"locale": locale,
"results": all_results[:limit]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"results": []
}
async def search_knowledge_base_http(query: str, locale: str = "en", limit: int = 10):
"""Search knowledge base - HTTP wrapper
Args:
query: Search keywords
locale: Language locale
limit: Maximum results
"""
# Try local knowledge base first using FTS
kb = get_kb()
try:
local_result = kb.search_faq(query, locale, limit)
if local_result["count"] > 0:
return local_result
except Exception as e:
print(f"Local KB search error: {e}")
# Fallback to searching FAQ across all categories via Strapi API
return await search_faq_http(query, locale, limit)
async def get_policy_http(policy_type: str, locale: str = "en"):
"""Get policy document - HTTP wrapper
Args:
policy_type: Type of policy (return_policy, privacy_policy, etc.)
locale: Language locale
"""
# Try local knowledge base first
kb = get_kb()
try:
local_result = kb.get_policy(policy_type, locale)
if local_result["success"]:
return local_result
except Exception as e:
print(f"Local KB error: {e}")
# Fallback to Strapi API
try:
# Map policy types to endpoints
policy_map = {
"return_policy": "policy-return",
"privacy_policy": "policy-privacy",
"terms_of_service": "policy-terms",
"shipping_policy": "policy-shipping",
"payment_policy": "policy-payment",
}
endpoint = policy_map.get(policy_type, f"policy-{policy_type}")
headers = {"Content-Type": "application/json"}
if settings.strapi_api_token and settings.strapi_api_token.strip():
headers["Authorization"] = f"Bearer {settings.strapi_api_token}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{settings.strapi_api_url}/api/{endpoint}",
params={"populate": "deep", "locale": locale},
headers=headers
)
response.raise_for_status()
data = response.json()
if not data.get("data"):
return {
"success": False,
"error": f"Policy '{policy_type}' not found",
"data": None
}
item = data["data"]
policy_data = {
"title": item.get("title"),
"summary": item.get("summary"),
"content": item.get("content"),
"version": item.get("version"),
"effective_date": item.get("effective_date")
}
# Save to local cache for next time
try:
kb.save_policy(policy_type, locale, policy_data)
except Exception as e:
print(f"Failed to save to local cache: {e}")
return {
"success": True,
"data": {
"id": item.get("id"),
"type": policy_type,
"title": item.get("title"),
"content": item.get("content"),
"summary": item.get("summary"),
"locale": locale
}
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": None
}