Files
assistant/mcp_servers/order_mcp/server.py
wangliang 0f13102a02 fix: 改进错误处理和清理测试代码
## 主要修复

### 1. JSON 解析错误处理
- 修复所有 Agent 的 LLM 响应解析失败时返回原始内容的问题
- 当 JSON 解析失败时,返回友好的兜底消息而不是原始文本
- 影响文件: customer_service.py, order.py, product.py, aftersale.py

### 2. FAQ 快速路径修复
- 修复 customer_service.py 中变量定义顺序问题
- has_faq_query 在使用前未定义导致 NameError
- 添加详细的错误日志记录

### 3. Chatwoot 集成改进
- 添加响应内容调试日志
- 改进错误处理和日志记录

### 4. 订单查询优化
- 将订单列表默认返回数量从 10 条改为 5 条
- 统一 MCP 工具层和 Mall Client 层的默认值

### 5. 代码清理
- 删除所有测试代码和示例文件
- 刋试文件包括: test_*.py, test_*.html, test_*.sh
- 删除测试目录: tests/, agent/tests/, agent/examples/

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-27 13:15:58 +08:00

727 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Order MCP Server - Order management tools
"""
import sys
import os
from typing import Optional, List
# Add shared module to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class Settings(BaseSettings):
"""Server configuration"""
hyperf_api_url: str
hyperf_api_token: str
# Mall API 配置
mall_api_url: str = "https://apicn.qa1.gaia888.com"
mall_api_token: str = ""
mall_tenant_id: str = "2"
mall_currency_code: str = "EUR"
mall_language_id: str = "1"
mall_source: str = "us.qa1.gaia888.com"
log_level: str = "INFO"
model_config = ConfigDict(env_file=".env")
settings = Settings()
# Create MCP server
mcp = FastMCP(
"Order Management"
)
# Tool registry for HTTP access
_tools = {}
# Hyperf client for this server
from shared.hyperf_client import HyperfClient
hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token)
# Mall API client
from shared.mall_client import MallClient
mall = MallClient(
api_url=getattr(settings, 'mall_api_url', 'https://apicn.qa1.gaia888.com'),
api_token=getattr(settings, 'mall_api_token', ''),
tenant_id=getattr(settings, 'mall_tenant_id', '2'),
currency_code=getattr(settings, 'mall_currency_code', 'EUR'),
language_id=getattr(settings, 'mall_language_id', '1'),
source=getattr(settings, 'mall_source', 'us.qa1.gaia888.com')
)
def register_tool(name: str):
"""Register a tool for HTTP access"""
def decorator(func):
_tools[name] = func
return func
return decorator
@register_tool("query_order")
@mcp.tool()
async def query_order(
user_id: str,
account_id: str,
order_id: Optional[str] = None,
status: Optional[str] = None,
date_start: Optional[str] = None,
date_end: Optional[str] = None,
page: int = 1,
page_size: int = 10
) -> dict:
"""Query orders for a user
Args:
user_id: User identifier
account_id: B2B account identifier
order_id: Specific order ID to query (optional)
status: Order status filter (pending, paid, shipped, delivered, cancelled)
date_start: Start date filter (YYYY-MM-DD)
date_end: End date filter (YYYY-MM-DD)
page: Page number (default: 1)
page_size: Items per page (default: 10)
Returns:
List of orders with details
"""
payload = {
"user_id": user_id,
"account_id": account_id,
"page": page,
"page_size": page_size
}
if order_id:
payload["order_id"] = order_id
if status:
payload["status"] = status
if date_start or date_end:
payload["date_range"] = {}
if date_start:
payload["date_range"]["start"] = date_start
if date_end:
payload["date_range"]["end"] = date_end
try:
result = await hyperf.post("/orders/query", json=payload)
return {
"success": True,
"orders": result.get("orders", []),
"pagination": result.get("pagination", {})
}
except Exception as e:
return {
"success": False,
"error": str(e),
"orders": []
}
@register_tool("track_logistics")
@mcp.tool()
async def track_logistics(
order_id: str,
tracking_number: Optional[str] = None
) -> dict:
"""Track order logistics/shipping status
Args:
order_id: Order ID
tracking_number: Tracking number (optional, will be fetched from order if not provided)
Returns:
Logistics tracking information with timeline
"""
try:
params = {}
if tracking_number:
params["tracking_number"] = tracking_number
result = await hyperf.get(f"/orders/{order_id}/logistics", params=params)
return {
"success": True,
"order_id": order_id,
"tracking_number": result.get("tracking_number"),
"courier": result.get("courier"),
"status": result.get("status"),
"estimated_delivery": result.get("estimated_delivery"),
"timeline": result.get("timeline", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("modify_order")
@mcp.tool()
async def modify_order(
order_id: str,
user_id: str,
modifications: dict
) -> dict:
"""Modify an existing order
Args:
order_id: Order ID to modify
user_id: User ID for permission verification
modifications: Changes to apply. Can include:
- shipping_address: {province, city, district, detail, contact, phone}
- items: [{product_id, quantity}] to update quantities
- notes: Order notes/instructions
Returns:
Modified order details and any price changes
"""
try:
result = await hyperf.put(
f"/orders/{order_id}/modify",
json={
"user_id": user_id,
"modifications": modifications
}
)
return {
"success": True,
"order_id": order_id,
"order": result.get("order", {}),
"price_diff": result.get("price_diff", 0),
"message": result.get("message", "Order modified successfully")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("cancel_order")
@mcp.tool()
async def cancel_order(
order_id: str,
user_id: str,
reason: str
) -> dict:
"""Cancel an order
Args:
order_id: Order ID to cancel
user_id: User ID for permission verification
reason: Cancellation reason
Returns:
Cancellation result with refund information
"""
try:
result = await hyperf.post(
f"/orders/{order_id}/cancel",
json={
"user_id": user_id,
"reason": reason
}
)
return {
"success": True,
"order_id": order_id,
"status": "cancelled",
"refund_info": result.get("refund_info", {}),
"message": result.get("message", "Order cancelled successfully")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("get_invoice")
@mcp.tool()
async def get_invoice(
order_id: str,
invoice_type: str = "normal"
) -> dict:
"""Get invoice for an order
Args:
order_id: Order ID
invoice_type: Invoice type ('normal' for regular invoice, 'vat' for VAT invoice)
Returns:
Invoice information and download URL
"""
try:
result = await hyperf.get(
f"/orders/{order_id}/invoice",
params={"type": invoice_type}
)
return {
"success": True,
"order_id": order_id,
"invoice_number": result.get("invoice_number"),
"invoice_type": invoice_type,
"amount": result.get("amount"),
"tax": result.get("tax"),
"invoice_url": result.get("invoice_url"),
"issued_at": result.get("issued_at")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("get_mall_order")
@mcp.tool()
async def get_mall_order(
order_id: str,
user_token: str = None,
user_id: str = None,
account_id: str = None
) -> dict:
"""Query order from Mall API by order ID
从商城 API 查询订单详情
Args:
order_id: 订单号 (e.g., "202071324")
user_token: 用户 JWT token可选如果提供则使用该 token 进行查询)
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
Returns:
订单详情,包含订单号、状态、商品信息、金额、物流信息等
Order details including order ID, status, items, amount, logistics info, etc.
"""
import logging
logger = logging.getLogger(__name__)
logger.info(
f"get_mall_order called: order_id={order_id}, has_user_token={bool(user_token)}, "
f"token_prefix={user_token[:20] if user_token else None}"
)
try:
# 必须提供 user_token
if not user_token:
logger.error("No user_token provided, user must be logged in")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询订单信息",
"order_id": order_id,
"require_login": True
}
logger.info("Using user token for Mall API request")
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
result = await client.get_order_by_id(order_id)
logger.info(
f"Mall API request successful: order_id={order_id}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else None}"
)
return {
"success": True,
"order": result,
"order_id": order_id
}
except Exception as e:
logger.error(
f"Mall API request failed: order_id={order_id}, error={str(e)}"
)
return {
"success": False,
"error": str(e),
"order_id": order_id
}
finally:
# 关闭客户端
if 'client' in dir() and client:
await client.close()
@register_tool("get_logistics")
@mcp.tool()
async def get_logistics(
order_id: str,
user_token: str = None,
user_id: str = None,
account_id: str = None
) -> dict:
"""Query logistics tracking information from Mall API
从 Mall API 查询订单物流信息
Args:
order_id: 订单号 (e.g., "201941967")
user_token: 用户 JWT token必需用于身份验证
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
Returns:
物流信息,包含快递公司、状态、预计送达时间、物流轨迹等
"""
print(f"[get_logistics] Called with order_id={order_id}, has_user_token={bool(user_token)}")
# 必须提供 user_token
if not user_token:
print("[get_logistics] ERROR: No user_token provided")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询物流信息",
"order_id": order_id,
"require_login": True
}
try:
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
print(f"[get_logistics] Calling Mall API: /mall/api/order/parcel?orderId={order_id}")
result = await client.get(
"/mall/api/order/parcel",
params={"orderId": order_id}
)
print(f"[get_logistics] SUCCESS: result_keys={list(result.keys()) if isinstance(result, dict) else type(result).__name__}")
print(f"[get_logistics] Sample data: {str(result)[:1000]}")
# Mall API 返回结构:{ "total": 1, "data": [{ "trackingCode": "...", "carrier": "...", ... }] }
logistics_list = result.get("data", [])
if logistics_list and len(logistics_list) > 0:
first_logistics = logistics_list[0]
tracking_number = first_logistics.get("trackingCode", "")
carrier = first_logistics.get("carrier", "未知")
# 提取 tracks 数组(物流轨迹)
tracks = first_logistics.get("tracks", [])
timeline = []
if tracks and isinstance(tracks, list):
for track in tracks:
if isinstance(track, dict):
timeline.append({
"id": track.get("id", ""),
"remark": track.get("remark", ""),
"time": track.get("time", track.get("date", "")),
"location": track.get("location", "")
})
print(f"[get_logistics] Extracted: tracking_number={tracking_number}, carrier={carrier}, tracks_count={len(timeline)}")
return {
"success": True,
"order_id": order_id,
"tracking_number": tracking_number,
"courier": carrier,
"tracking_url": first_logistics.get("trackingUrl", ""),
"status": first_logistics.get("status", ""),
"timeline": timeline
}
else:
print(f"[get_logistics] WARNING: No logistics data found in response")
return {
"success": True,
"order_id": order_id,
"tracking_number": "",
"courier": "暂无物流信息",
"status": "",
"timeline": []
}
except Exception as e:
import traceback
print(f"[get_logistics] ERROR: {type(e).__name__}: {str(e)}")
print(f"[get_logistics] TRACEBACK:\n{traceback.format_exc()}")
return {
"success": False,
"error": str(e),
"order_id": order_id
}
finally:
if 'client' in dir() and client:
await client.close()
@register_tool("get_mall_order_list")
@mcp.tool()
async def get_mall_order_list(
user_token: str = None,
user_id: str = None,
account_id: str = None,
page: int = 1,
limit: int = 5,
customer_id: int = 0,
order_types: Optional[List[int]] = None,
shipping_status: int = 10000,
date_added: Optional[str] = None,
date_end: Optional[str] = None,
no: Optional[str] = None,
status: int = 10000,
is_drop_shopping: int = 0
) -> dict:
"""Query order list from Mall API with filters
从 Mall API 查询订单列表,支持多种筛选条件
规则:
- 默认返回最近 5 个订单
- 包含全部状态的订单(不限制状态)
Args:
user_token: 用户 JWT token必需用于身份验证
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
page: 页码 (default: 1)
limit: 每页数量 (default: 5, max 50)
customer_id: 客户ID (default: 0, 0表示所有客户)
order_types: 订单类型数组,如 [1, 2] (default: None)
shipping_status: 物流状态 (default: 10000, 10000表示全部状态)
date_added: 开始日期,格式 YYYY-MM-DD (default: None)
date_end: 结束日期,格式 YYYY-MM-DD (default: None)
no: 订单号筛选 (default: None)
status: 订单状态筛选 (default: 10000, 10000表示全部状态)
is_drop_shopping: 是否代发货 (default: 0)
Returns:
订单列表和分页信息
{
"success": true,
"orders": [...], # 订单列表
"total": 100, # 总订单数
"page": 1, # 当前页
"limit": 5, # 每页数量
"total_pages": 20 # 总页数
}
Example:
用户问: "我的订单有哪些?"
Agent 调用: get_mall_order_list(page=1, limit=5)
"""
import logging
import json
import base64
logger = logging.getLogger(__name__)
logger.info(
f"get_mall_order_list called: page={page}, limit={limit}, "
f"has_user_token={bool(user_token)}, customer_id={customer_id}"
)
# 必须提供 user_token
if not user_token:
logger.error("No user_token provided, user must be logged in")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询订单列表",
"require_login": True,
"orders": []
}
# 从 JWT token 中提取 userId 作为 customer_id
if customer_id == 0:
try:
# JWT token 格式: header.payload.signature
parts = user_token.split('.')
if len(parts) >= 2:
# 解码 payload
payload = parts[1]
# 添加 padding 如果需要
payload += '=' * (4 - len(payload) % 4)
decoded = base64.b64decode(payload)
payload_data = json.loads(decoded)
# 尝试从不同字段获取 userId
customer_id = payload_data.get('userId') or payload_data.get('uid') or payload_data.get('sub') or 0
logger.info(
f"Extracted customer_id from token: customer_id={customer_id}, "
f"token_payload_keys={list(payload_data.keys())}"
)
except Exception as e:
logger.warning(f"Failed to extract customer_id from token: {e}")
customer_id = 0
try:
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
result = await client.get_order_list(
page=page,
limit=limit,
customer_id=customer_id,
order_types=order_types,
shipping_status=shipping_status,
date_added=date_added,
date_end=date_end,
no=no,
status=status,
is_drop_shopping=is_drop_shopping
)
logger.info(
f"Mall API request successful: page={page}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else None}"
)
# Mall API 返回结构: {"data": [...], "total": 100}
orders = result.get("data", [])
total = result.get("total", 0)
return {
"success": True,
"orders": orders,
"total": total,
"page": page,
"limit": limit,
"total_pages": (total + limit - 1) // limit if limit > 0 else 0
}
except Exception as e:
logger.error(
f"Mall API request failed: page={page}, error={str(e)}"
)
return {
"success": False,
"error": str(e),
"orders": []
}
finally:
if 'client' in dir() and client:
await client.close()
# Health check endpoint
@register_tool("health_check")
@mcp.tool()
async def health_check() -> dict:
"""Check server health status"""
return {
"status": "healthy",
"service": "order_mcp",
"version": "1.0.0"
}
if __name__ == "__main__":
import uvicorn
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
# Health check endpoint
async def health_check(request):
return JSONResponse({"status": "healthy"})
# Tool execution endpoint
async def execute_tool(request: Request):
"""Execute an MCP tool via HTTP"""
tool_name = request.path_params["tool_name"]
try:
# Get arguments from request body
arguments = await request.json()
# Get tool function from registry
if tool_name not in _tools:
return JSONResponse({
"success": False,
"error": f"Tool '{tool_name}' not found"
}, status_code=404)
tool_obj = _tools[tool_name]
# Filter arguments to only include parameters expected by the tool
# Get parameter names from tool's parameters schema
tool_params = tool_obj.parameters.get('properties', {})
filtered_args = {k: v for k, v in arguments.items() if k in tool_params}
# Call the tool with filtered arguments
# FastMCP FunctionTool.run() takes a dict of arguments
tool_result = await tool_obj.run(filtered_args)
# Extract content from ToolResult
# ToolResult.content is a list of TextContent objects with a 'text' attribute
if tool_result.content and len(tool_result.content) > 0:
content = tool_result.content[0].text
# Try to parse as JSON if possible
try:
import json
result = json.loads(content)
except:
result = content
else:
result = None
return JSONResponse({
"success": True,
"result": result
})
except TypeError as e:
return JSONResponse({
"success": False,
"error": f"Invalid arguments: {str(e)}"
}, status_code=400)
except Exception as e:
return JSONResponse({
"success": False,
"error": str(e)
}, status_code=500)
# Create routes list
routes = [
Route('/health', health_check, methods=['GET']),
Route('/tools/{tool_name}', execute_tool, methods=['POST'])
]
# Create app from MCP with custom routes
app = mcp.http_app()
# Add our custom routes to the existing app
for route in routes:
app.router.routes.append(route)
uvicorn.run(app, host="0.0.0.0", port=8002)