Files
assistant/mcp_servers/order_mcp/server.py
wangliang 0b5d0a8086 feat: 重构订单和物流信息展示格式
主要改动:
- 订单列表:使用 order_list 格式,展示 5 个订单(全部状态)
- 订单详情:使用 order_detail 格式,优化价格和时间显示
- 物流信息:使用 logistics 格式,根据 track id 动态生成步骤
- 商品图片:从 orderProduct.imageUrl 字段获取
- 时间格式:统一为 YYYY-MM-DD HH:MM:SS
- 多语言支持:amountLabel、orderTime 支持中英文
- 配置管理:新增 FRONTEND_URL 环境变量
- API 集成:改进 Mall API tracks 数据解析
- 认证优化:account_id 从 webhook 动态获取

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-23 18:49:40 +08:00

722 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Order MCP Server - Order management tools
"""
import sys
import os
from typing import Optional, List
# Add shared module to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class Settings(BaseSettings):
"""Server configuration"""
hyperf_api_url: str
hyperf_api_token: str
# Mall API 配置
mall_api_url: str = "https://apicn.qa1.gaia888.com"
mall_api_token: str = ""
mall_tenant_id: str = "2"
mall_currency_code: str = "EUR"
mall_language_id: str = "1"
mall_source: str = "us.qa1.gaia888.com"
log_level: str = "INFO"
model_config = ConfigDict(env_file=".env")
settings = Settings()
# Create MCP server
mcp = FastMCP(
"Order Management"
)
# Tool registry for HTTP access
_tools = {}
# Hyperf client for this server
from shared.hyperf_client import HyperfClient
hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token)
# Mall API client
from shared.mall_client import MallClient
mall = MallClient(
api_url=getattr(settings, 'mall_api_url', 'https://apicn.qa1.gaia888.com'),
api_token=getattr(settings, 'mall_api_token', ''),
tenant_id=getattr(settings, 'mall_tenant_id', '2'),
currency_code=getattr(settings, 'mall_currency_code', 'EUR'),
language_id=getattr(settings, 'mall_language_id', '1'),
source=getattr(settings, 'mall_source', 'us.qa1.gaia888.com')
)
def register_tool(name: str):
"""Register a tool for HTTP access"""
def decorator(func):
_tools[name] = func
return func
return decorator
@register_tool("query_order")
@mcp.tool()
async def query_order(
user_id: str,
account_id: str,
order_id: Optional[str] = None,
status: Optional[str] = None,
date_start: Optional[str] = None,
date_end: Optional[str] = None,
page: int = 1,
page_size: int = 10
) -> dict:
"""Query orders for a user
Args:
user_id: User identifier
account_id: B2B account identifier
order_id: Specific order ID to query (optional)
status: Order status filter (pending, paid, shipped, delivered, cancelled)
date_start: Start date filter (YYYY-MM-DD)
date_end: End date filter (YYYY-MM-DD)
page: Page number (default: 1)
page_size: Items per page (default: 10)
Returns:
List of orders with details
"""
payload = {
"user_id": user_id,
"account_id": account_id,
"page": page,
"page_size": page_size
}
if order_id:
payload["order_id"] = order_id
if status:
payload["status"] = status
if date_start or date_end:
payload["date_range"] = {}
if date_start:
payload["date_range"]["start"] = date_start
if date_end:
payload["date_range"]["end"] = date_end
try:
result = await hyperf.post("/orders/query", json=payload)
return {
"success": True,
"orders": result.get("orders", []),
"pagination": result.get("pagination", {})
}
except Exception as e:
return {
"success": False,
"error": str(e),
"orders": []
}
@register_tool("track_logistics")
@mcp.tool()
async def track_logistics(
order_id: str,
tracking_number: Optional[str] = None
) -> dict:
"""Track order logistics/shipping status
Args:
order_id: Order ID
tracking_number: Tracking number (optional, will be fetched from order if not provided)
Returns:
Logistics tracking information with timeline
"""
try:
params = {}
if tracking_number:
params["tracking_number"] = tracking_number
result = await hyperf.get(f"/orders/{order_id}/logistics", params=params)
return {
"success": True,
"order_id": order_id,
"tracking_number": result.get("tracking_number"),
"courier": result.get("courier"),
"status": result.get("status"),
"estimated_delivery": result.get("estimated_delivery"),
"timeline": result.get("timeline", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("modify_order")
@mcp.tool()
async def modify_order(
order_id: str,
user_id: str,
modifications: dict
) -> dict:
"""Modify an existing order
Args:
order_id: Order ID to modify
user_id: User ID for permission verification
modifications: Changes to apply. Can include:
- shipping_address: {province, city, district, detail, contact, phone}
- items: [{product_id, quantity}] to update quantities
- notes: Order notes/instructions
Returns:
Modified order details and any price changes
"""
try:
result = await hyperf.put(
f"/orders/{order_id}/modify",
json={
"user_id": user_id,
"modifications": modifications
}
)
return {
"success": True,
"order_id": order_id,
"order": result.get("order", {}),
"price_diff": result.get("price_diff", 0),
"message": result.get("message", "Order modified successfully")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("cancel_order")
@mcp.tool()
async def cancel_order(
order_id: str,
user_id: str,
reason: str
) -> dict:
"""Cancel an order
Args:
order_id: Order ID to cancel
user_id: User ID for permission verification
reason: Cancellation reason
Returns:
Cancellation result with refund information
"""
try:
result = await hyperf.post(
f"/orders/{order_id}/cancel",
json={
"user_id": user_id,
"reason": reason
}
)
return {
"success": True,
"order_id": order_id,
"status": "cancelled",
"refund_info": result.get("refund_info", {}),
"message": result.get("message", "Order cancelled successfully")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("get_invoice")
@mcp.tool()
async def get_invoice(
order_id: str,
invoice_type: str = "normal"
) -> dict:
"""Get invoice for an order
Args:
order_id: Order ID
invoice_type: Invoice type ('normal' for regular invoice, 'vat' for VAT invoice)
Returns:
Invoice information and download URL
"""
try:
result = await hyperf.get(
f"/orders/{order_id}/invoice",
params={"type": invoice_type}
)
return {
"success": True,
"order_id": order_id,
"invoice_number": result.get("invoice_number"),
"invoice_type": invoice_type,
"amount": result.get("amount"),
"tax": result.get("tax"),
"invoice_url": result.get("invoice_url"),
"issued_at": result.get("issued_at")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("get_mall_order")
@mcp.tool()
async def get_mall_order(
order_id: str,
user_token: str = None,
user_id: str = None,
account_id: str = None
) -> dict:
"""Query order from Mall API by order ID
从商城 API 查询订单详情
Args:
order_id: 订单号 (e.g., "202071324")
user_token: 用户 JWT token可选如果提供则使用该 token 进行查询)
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
Returns:
订单详情,包含订单号、状态、商品信息、金额、物流信息等
Order details including order ID, status, items, amount, logistics info, etc.
"""
import logging
logger = logging.getLogger(__name__)
logger.info(
f"get_mall_order called: order_id={order_id}, has_user_token={bool(user_token)}, "
f"token_prefix={user_token[:20] if user_token else None}"
)
try:
# 必须提供 user_token
if not user_token:
logger.error("No user_token provided, user must be logged in")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询订单信息",
"order_id": order_id,
"require_login": True
}
logger.info("Using user token for Mall API request")
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
result = await client.get_order_by_id(order_id)
logger.info(
f"Mall API request successful: order_id={order_id}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else None}"
)
return {
"success": True,
"order": result,
"order_id": order_id
}
except Exception as e:
logger.error(
f"Mall API request failed: order_id={order_id}, error={str(e)}"
)
return {
"success": False,
"error": str(e),
"order_id": order_id
}
finally:
# 关闭客户端
if 'client' in dir() and client:
await client.close()
@register_tool("get_logistics")
@mcp.tool()
async def get_logistics(
order_id: str,
user_token: str = None,
user_id: str = None,
account_id: str = None
) -> dict:
"""Query logistics tracking information from Mall API
从 Mall API 查询订单物流信息
Args:
order_id: 订单号 (e.g., "201941967")
user_token: 用户 JWT token必需用于身份验证
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
Returns:
物流信息,包含快递公司、状态、预计送达时间、物流轨迹等
"""
print(f"[get_logistics] Called with order_id={order_id}, has_user_token={bool(user_token)}")
# 必须提供 user_token
if not user_token:
print("[get_logistics] ERROR: No user_token provided")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询物流信息",
"order_id": order_id,
"require_login": True
}
try:
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
print(f"[get_logistics] Calling Mall API: /mall/api/order/parcel?orderId={order_id}")
result = await client.get(
"/mall/api/order/parcel",
params={"orderId": order_id}
)
print(f"[get_logistics] SUCCESS: result_keys={list(result.keys()) if isinstance(result, dict) else type(result).__name__}")
print(f"[get_logistics] Sample data: {str(result)[:1000]}")
# Mall API 返回结构:{ "total": 1, "data": [{ "trackingCode": "...", "carrier": "...", ... }] }
logistics_list = result.get("data", [])
if logistics_list and len(logistics_list) > 0:
first_logistics = logistics_list[0]
tracking_number = first_logistics.get("trackingCode", "")
carrier = first_logistics.get("carrier", "未知")
# 提取 tracks 数组(物流轨迹)
tracks = first_logistics.get("tracks", [])
timeline = []
if tracks and isinstance(tracks, list):
for track in tracks:
if isinstance(track, dict):
timeline.append({
"id": track.get("id", ""),
"remark": track.get("remark", ""),
"time": track.get("time", track.get("date", "")),
"location": track.get("location", "")
})
print(f"[get_logistics] Extracted: tracking_number={tracking_number}, carrier={carrier}, tracks_count={len(timeline)}")
return {
"success": True,
"order_id": order_id,
"tracking_number": tracking_number,
"courier": carrier,
"tracking_url": first_logistics.get("trackingUrl", ""),
"status": first_logistics.get("status", ""),
"timeline": timeline
}
else:
print(f"[get_logistics] WARNING: No logistics data found in response")
return {
"success": True,
"order_id": order_id,
"tracking_number": "",
"courier": "暂无物流信息",
"status": "",
"timeline": []
}
except Exception as e:
import traceback
print(f"[get_logistics] ERROR: {type(e).__name__}: {str(e)}")
print(f"[get_logistics] TRACEBACK:\n{traceback.format_exc()}")
return {
"success": False,
"error": str(e),
"order_id": order_id
}
finally:
if 'client' in dir() and client:
await client.close()
@register_tool("get_mall_order_list")
@mcp.tool()
async def get_mall_order_list(
user_token: str = None,
user_id: str = None,
account_id: str = None,
page: int = 1,
limit: int = 5,
customer_id: int = 0,
order_types: Optional[List[int]] = None,
shipping_status: int = 10000,
date_added: Optional[str] = None,
date_end: Optional[str] = None,
no: Optional[str] = None,
status: Optional[int] = None,
is_drop_shopping: int = 0
) -> dict:
"""Query order list from Mall API with filters
从 Mall API 查询订单列表,支持多种筛选条件
规则:
- 默认返回最近 5 个订单
- 包含全部状态的订单(不限制状态)
Args:
user_token: 用户 JWT token必需用于身份验证
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
page: 页码 (default: 1)
limit: 每页数量 (default: 5, max 50)
customer_id: 客户ID (default: 0, 0表示所有客户)
order_types: 订单类型数组,如 [1, 2] (default: None)
shipping_status: 物流状态 (default: 10000, 10000表示全部状态)
date_added: 开始日期,格式 YYYY-MM-DD (default: None)
date_end: 结束日期,格式 YYYY-MM-DD (default: None)
no: 订单号筛选 (default: None)
status: 订单状态筛选 (default: None, None表示全部状态)
is_drop_shopping: 是否代发货 (default: 0)
Returns:
订单列表和分页信息
{
"success": true,
"orders": [...], # 订单列表
"total": 100, # 总订单数
"page": 1, # 当前页
"limit": 5, # 每页数量
"total_pages": 20 # 总页数
}
Example:
用户问: "我的订单有哪些?"
Agent 调用: get_mall_order_list(page=1, limit=5)
"""
import logging
import json
import base64
logger = logging.getLogger(__name__)
logger.info(
f"get_mall_order_list called: page={page}, limit={limit}, "
f"has_user_token={bool(user_token)}, customer_id={customer_id}"
)
# 必须提供 user_token
if not user_token:
logger.error("No user_token provided, user must be logged in")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询订单列表",
"require_login": True,
"orders": []
}
# 从 JWT token 中提取 userId 作为 customer_id
if customer_id == 0:
try:
# JWT token 格式: header.payload.signature
parts = user_token.split('.')
if len(parts) >= 2:
# 解码 payload
payload = parts[1]
# 添加 padding 如果需要
payload += '=' * (4 - len(payload) % 4)
decoded = base64.b64decode(payload)
payload_data = json.loads(decoded)
# 尝试从不同字段获取 userId
customer_id = payload_data.get('userId') or payload_data.get('uid') or payload_data.get('sub') or 0
logger.info(
f"Extracted customer_id from token: customer_id={customer_id}, "
f"token_payload_keys={list(payload_data.keys())}"
)
except Exception as e:
logger.warning(f"Failed to extract customer_id from token: {e}")
customer_id = 0
try:
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
result = await client.get_order_list(
page=page,
limit=limit,
customer_id=customer_id,
order_types=order_types,
shipping_status=shipping_status,
date_added=date_added,
date_end=date_end,
no=no,
status=status,
is_drop_shopping=is_drop_shopping
)
logger.info(
f"Mall API request successful: page={page}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else None}"
)
# Mall API 返回结构: {"data": [...], "total": 100}
orders = result.get("data", [])
total = result.get("total", 0)
return {
"success": True,
"orders": orders,
"total": total,
"page": page,
"limit": limit,
"total_pages": (total + limit - 1) // limit if limit > 0 else 0
}
except Exception as e:
logger.error(
f"Mall API request failed: page={page}, error={str(e)}"
)
return {
"success": False,
"error": str(e),
"orders": []
}
finally:
if 'client' in dir() and client:
await client.close()
# Health check endpoint
@register_tool("health_check")
@mcp.tool()
async def health_check() -> dict:
"""Check server health status"""
return {
"status": "healthy",
"service": "order_mcp",
"version": "1.0.0"
}
if __name__ == "__main__":
import uvicorn
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
# Health check endpoint
async def health_check(request):
return JSONResponse({"status": "healthy"})
# Tool execution endpoint
async def execute_tool(request: Request):
"""Execute an MCP tool via HTTP"""
tool_name = request.path_params["tool_name"]
try:
# Get arguments from request body
arguments = await request.json()
# Get tool function from registry
if tool_name not in _tools:
return JSONResponse({
"success": False,
"error": f"Tool '{tool_name}' not found"
}, status_code=404)
tool_obj = _tools[tool_name]
# Call the tool with arguments
# FastMCP FunctionTool.run() takes a dict of arguments
tool_result = await tool_obj.run(arguments)
# Extract content from ToolResult
# ToolResult.content is a list of TextContent objects with a 'text' attribute
if tool_result.content and len(tool_result.content) > 0:
content = tool_result.content[0].text
# Try to parse as JSON if possible
try:
import json
result = json.loads(content)
except:
result = content
else:
result = None
return JSONResponse({
"success": True,
"result": result
})
except TypeError as e:
return JSONResponse({
"success": False,
"error": f"Invalid arguments: {str(e)}"
}, status_code=400)
except Exception as e:
return JSONResponse({
"success": False,
"error": str(e)
}, status_code=500)
# Create routes list
routes = [
Route('/health', health_check, methods=['GET']),
Route('/tools/{tool_name}', execute_tool, methods=['POST'])
]
# Create app from MCP with custom routes
app = mcp.http_app()
# Add our custom routes to the existing app
for route in routes:
app.router.routes.append(route)
uvicorn.run(app, host="0.0.0.0", port=8002)