Files
assistant/mcp_servers/strapi_mcp/server.py

338 lines
9.6 KiB
Python
Raw Normal View History

"""
Strapi MCP Server - FAQ and Knowledge Base
"""
import sys
import os
import asyncio
from typing import Optional
from datetime import datetime
# Add shared module to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from pydantic_settings import BaseSettings
from fastapi import Request
from starlette.responses import JSONResponse
import uvicorn
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from pydantic import ConfigDict
class Settings(BaseSettings):
"""Server configuration"""
strapi_api_url: str
strapi_api_token: str
log_level: str = "INFO"
sync_interval_minutes: int = 60 # Sync every 60 minutes
sync_on_startup: bool = True # Run initial sync on startup
model_config = ConfigDict(env_file=".env")
settings = Settings()
# Create MCP server
mcp = FastMCP(
"Strapi Knowledge Base"
)
# Strapi client for this server
from shared.strapi_client import StrapiClient
strapi = StrapiClient(settings.strapi_api_url, settings.strapi_api_token)
@mcp.tool()
async def query_faq(
category: str = "other",
locale: str = "en",
limit: int = 10
) -> dict:
"""Get FAQ by category
Args:
category: FAQ category
Available: register, order, pre-order, payment, shipment, return, other
locale: Language locale (default: en)
Available: en, nl, de, es, fr, it, tr
limit: Maximum results to return (default: 10)
Returns:
List of FAQ items with questions and answers for the specified category
"""
from http_routes import query_faq_http
return await query_faq_http(category, locale, limit)
@mcp.tool()
async def get_company_info(
section: str,
locale: str = "en"
) -> dict:
"""Get company information by section
Args:
section: Information section (about_us, contact, service_hours, locations, etc.)
locale: Language locale (default: zh-CN)
Returns:
Company information for the requested section
"""
try:
response = await strapi.query_collection(
"company-infos",
filters={"[section][$eq]": section},
locale=locale
)
results = strapi.flatten_response(response)
if not results:
return {
"success": False,
"error": f"Section '{section}' not found",
"data": None
}
item = results[0]
return {
"success": True,
"data": {
"section": item.get("section"),
"title": item.get("title"),
"content": item.get("content"),
"metadata": item.get("metadata", {})
}
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": None
}
@mcp.tool()
async def get_policy(
policy_type: str,
locale: str = "en"
) -> dict:
"""Get policy document
Args:
policy_type: Type of policy (return_policy, privacy_policy, terms_of_service,
shipping_policy, payment_policy, etc.)
locale: Language locale (default: zh-CN)
Returns:
Policy document with content and metadata
"""
try:
response = await strapi.query_collection(
"policies",
filters={"[type][$eq]": policy_type},
locale=locale
)
results = strapi.flatten_response(response)
if not results:
return {
"success": False,
"error": f"Policy '{policy_type}' not found",
"data": None
}
item = results[0]
return {
"success": True,
"data": {
"type": item.get("type"),
"title": item.get("title"),
"content": item.get("content"),
"summary": item.get("summary"),
"version": item.get("version"),
"effective_date": item.get("effective_date"),
"last_updated": item.get("updatedAt")
}
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": None
}
@mcp.tool()
async def search_knowledge_base(
query: str,
locale: str = "en",
limit: int = 10
) -> dict:
"""Search knowledge base documents across all FAQ categories
Args:
query: Search keywords
locale: Language locale (default: en)
Available: en, nl, de, es, fr, it, tr
limit: Maximum results to return (default: 10)
Returns:
List of matching FAQ documents from all categories
"""
from http_routes import search_knowledge_base_http
return await search_knowledge_base_http(query, locale, limit)
# Health check endpoint
@mcp.tool()
async def health_check() -> dict:
"""Check server health status"""
return {
"status": "healthy",
"service": "strapi_mcp",
"version": "1.0.0"
}
# ============ Sync Scheduler ============
scheduler = AsyncIOScheduler()
async def run_scheduled_sync():
"""Run scheduled sync from Strapi to local knowledge base"""
try:
from sync import StrapiSyncer
from knowledge_base import get_kb
kb = get_kb()
syncer = StrapiSyncer(kb)
print(f"[{datetime.now()}] Starting scheduled sync...")
result = await syncer.sync_all()
if result["success"]:
print(f"[{datetime.now()}] Sync completed successfully")
else:
print(f"[{datetime.now()}] Sync failed: {result.get('error', 'Unknown error')}")
except Exception as e:
print(f"[{datetime.now()}] Sync error: {e}")
async def run_initial_sync():
"""Run initial sync on startup if enabled"""
if settings.sync_on_startup:
print("Running initial sync on startup...")
await run_scheduled_sync()
print("Initial sync completed")
def start_scheduler():
"""Start the background sync scheduler"""
if settings.sync_interval_minutes > 0:
scheduler.add_job(
run_scheduled_sync,
'interval',
minutes=settings.sync_interval_minutes,
id='strapi_sync',
replace_existing=True
)
scheduler.start()
print(f"Sync scheduler started (interval: {settings.sync_interval_minutes} minutes)")
else:
print("Sync scheduler disabled (interval set to 0)")
if __name__ == "__main__":
# Create FastAPI app from MCP
mcp_app = mcp.http_app()
# Add health endpoint
async def health_check(request):
return JSONResponse({"status": "healthy"})
# Import HTTP routes
from http_routes import (
get_company_info_http,
query_faq_http,
get_policy_http,
search_knowledge_base_http
)
# Direct function references for HTTP endpoints
async def call_query_faq(request):
"""HTTP endpoint for query_faq"""
try:
data = await request.json()
result = await query_faq_http(**data)
return JSONResponse(result)
except Exception as e:
return JSONResponse({"success": False, "error": str(e)}, status_code=500)
async def call_get_company_info(request):
"""HTTP endpoint for get_company_info"""
try:
data = await request.json()
result = await get_company_info_http(**data)
return JSONResponse(result)
except Exception as e:
return JSONResponse({"success": False, "error": str(e)}, status_code=500)
async def call_get_policy(request):
"""HTTP endpoint for get_policy"""
try:
data = await request.json()
result = await get_policy_http(**data)
return JSONResponse(result)
except Exception as e:
return JSONResponse({"success": False, "error": str(e)}, status_code=500)
async def call_search_knowledge_base(request):
"""HTTP endpoint for search_knowledge_base"""
try:
data = await request.json()
result = await search_knowledge_base_http(**data)
return JSONResponse(result)
except Exception as e:
return JSONResponse({"success": False, "error": str(e)}, status_code=500)
# Add routes using the correct method
from fastapi import FastAPI
from contextlib import asynccontextmanager
# Lifespan context manager for startup/shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: start scheduler and run initial sync
start_scheduler()
if settings.sync_on_startup:
print("Running initial sync on startup...")
await run_scheduled_sync()
print("Initial sync completed")
yield
# Shutdown: stop scheduler
scheduler.shutdown()
# Create a wrapper FastAPI app with custom routes first
app = FastAPI(lifespan=lifespan)
# Add custom routes BEFORE mounting mcp_app
app.add_route("/health", health_check, methods=["GET"])
app.add_route("/tools/query_faq", call_query_faq, methods=["POST"])
app.add_route("/tools/get_company_info", call_get_company_info, methods=["POST"])
app.add_route("/tools/get_policy", call_get_policy, methods=["POST"])
app.add_route("/tools/search_knowledge_base", call_search_knowledge_base, methods=["POST"])
# Mount MCP app at root (will catch all other routes)
app.mount("/", mcp_app)
uvicorn.run(app, host="0.0.0.0", port=8001)