Files
assistant/mcp_servers/order_mcp/server.py
wl 3ad6eee0d9 feat: 初始化 B2B AI Shopping Assistant 项目
- 配置 Docker Compose 多服务编排
- 实现 Chatwoot + Agent 集成
- 配置 Strapi MCP 知识库
- 支持 7 种语言的 FAQ 系统
- 实现 LangGraph AI 工作流

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-14 19:25:22 +08:00

285 lines
7.2 KiB
Python

"""
Order MCP Server - Order management tools
"""
import sys
import os
from typing import Optional, List
# Add shared module to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class Settings(BaseSettings):
"""Server configuration"""
hyperf_api_url: str
hyperf_api_token: str
log_level: str = "INFO"
model_config = ConfigDict(env_file=".env")
settings = Settings()
# Create MCP server
mcp = FastMCP(
"Order Management"
)
# Hyperf client for this server
from shared.hyperf_client import HyperfClient
hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token)
@mcp.tool()
async def query_order(
user_id: str,
account_id: str,
order_id: Optional[str] = None,
status: Optional[str] = None,
date_start: Optional[str] = None,
date_end: Optional[str] = None,
page: int = 1,
page_size: int = 10
) -> dict:
"""Query orders for a user
Args:
user_id: User identifier
account_id: B2B account identifier
order_id: Specific order ID to query (optional)
status: Order status filter (pending, paid, shipped, delivered, cancelled)
date_start: Start date filter (YYYY-MM-DD)
date_end: End date filter (YYYY-MM-DD)
page: Page number (default: 1)
page_size: Items per page (default: 10)
Returns:
List of orders with details
"""
payload = {
"user_id": user_id,
"account_id": account_id,
"page": page,
"page_size": page_size
}
if order_id:
payload["order_id"] = order_id
if status:
payload["status"] = status
if date_start or date_end:
payload["date_range"] = {}
if date_start:
payload["date_range"]["start"] = date_start
if date_end:
payload["date_range"]["end"] = date_end
try:
result = await hyperf.post("/orders/query", json=payload)
return {
"success": True,
"orders": result.get("orders", []),
"pagination": result.get("pagination", {})
}
except Exception as e:
return {
"success": False,
"error": str(e),
"orders": []
}
@mcp.tool()
async def track_logistics(
order_id: str,
tracking_number: Optional[str] = None
) -> dict:
"""Track order logistics/shipping status
Args:
order_id: Order ID
tracking_number: Tracking number (optional, will be fetched from order if not provided)
Returns:
Logistics tracking information with timeline
"""
try:
params = {}
if tracking_number:
params["tracking_number"] = tracking_number
result = await hyperf.get(f"/orders/{order_id}/logistics", params=params)
return {
"success": True,
"order_id": order_id,
"tracking_number": result.get("tracking_number"),
"courier": result.get("courier"),
"status": result.get("status"),
"estimated_delivery": result.get("estimated_delivery"),
"timeline": result.get("timeline", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@mcp.tool()
async def modify_order(
order_id: str,
user_id: str,
modifications: dict
) -> dict:
"""Modify an existing order
Args:
order_id: Order ID to modify
user_id: User ID for permission verification
modifications: Changes to apply. Can include:
- shipping_address: {province, city, district, detail, contact, phone}
- items: [{product_id, quantity}] to update quantities
- notes: Order notes/instructions
Returns:
Modified order details and any price changes
"""
try:
result = await hyperf.put(
f"/orders/{order_id}/modify",
json={
"user_id": user_id,
"modifications": modifications
}
)
return {
"success": True,
"order_id": order_id,
"order": result.get("order", {}),
"price_diff": result.get("price_diff", 0),
"message": result.get("message", "Order modified successfully")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@mcp.tool()
async def cancel_order(
order_id: str,
user_id: str,
reason: str
) -> dict:
"""Cancel an order
Args:
order_id: Order ID to cancel
user_id: User ID for permission verification
reason: Cancellation reason
Returns:
Cancellation result with refund information
"""
try:
result = await hyperf.post(
f"/orders/{order_id}/cancel",
json={
"user_id": user_id,
"reason": reason
}
)
return {
"success": True,
"order_id": order_id,
"status": "cancelled",
"refund_info": result.get("refund_info", {}),
"message": result.get("message", "Order cancelled successfully")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@mcp.tool()
async def get_invoice(
order_id: str,
invoice_type: str = "normal"
) -> dict:
"""Get invoice for an order
Args:
order_id: Order ID
invoice_type: Invoice type ('normal' for regular invoice, 'vat' for VAT invoice)
Returns:
Invoice information and download URL
"""
try:
result = await hyperf.get(
f"/orders/{order_id}/invoice",
params={"type": invoice_type}
)
return {
"success": True,
"order_id": order_id,
"invoice_number": result.get("invoice_number"),
"invoice_type": invoice_type,
"amount": result.get("amount"),
"tax": result.get("tax"),
"invoice_url": result.get("invoice_url"),
"issued_at": result.get("issued_at")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
# Health check endpoint
@mcp.tool()
async def health_check() -> dict:
"""Check server health status"""
return {
"status": "healthy",
"service": "order_mcp",
"version": "1.0.0"
}
if __name__ == "__main__":
import uvicorn
# Create FastAPI app from MCP
app = mcp.http_app()
# Add health endpoint
from starlette.responses import JSONResponse
async def health_check(request):
return JSONResponse({"status": "healthy"})
# Add the route to the app
from starlette.routing import Route
app.router.routes.append(Route('/health', health_check, methods=['GET']))
uvicorn.run(app, host="0.0.0.0", port=8002)