Files

727 lines
22 KiB
Python
Raw Permalink Normal View History

"""
Order MCP Server - Order management tools
"""
import sys
import os
from typing import Optional, List
# Add shared module to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class Settings(BaseSettings):
"""Server configuration"""
hyperf_api_url: str
hyperf_api_token: str
# Mall API 配置
mall_api_url: str = "https://apicn.qa1.gaia888.com"
mall_api_token: str = ""
mall_tenant_id: str = "2"
mall_currency_code: str = "EUR"
mall_language_id: str = "1"
mall_source: str = "us.qa1.gaia888.com"
log_level: str = "INFO"
model_config = ConfigDict(env_file=".env")
settings = Settings()
# Create MCP server
mcp = FastMCP(
"Order Management"
)
# Tool registry for HTTP access
_tools = {}
# Hyperf client for this server
from shared.hyperf_client import HyperfClient
hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token)
# Mall API client
from shared.mall_client import MallClient
mall = MallClient(
api_url=getattr(settings, 'mall_api_url', 'https://apicn.qa1.gaia888.com'),
api_token=getattr(settings, 'mall_api_token', ''),
tenant_id=getattr(settings, 'mall_tenant_id', '2'),
currency_code=getattr(settings, 'mall_currency_code', 'EUR'),
language_id=getattr(settings, 'mall_language_id', '1'),
source=getattr(settings, 'mall_source', 'us.qa1.gaia888.com')
)
def register_tool(name: str):
"""Register a tool for HTTP access"""
def decorator(func):
_tools[name] = func
return func
return decorator
@register_tool("query_order")
@mcp.tool()
async def query_order(
user_id: str,
account_id: str,
order_id: Optional[str] = None,
status: Optional[str] = None,
date_start: Optional[str] = None,
date_end: Optional[str] = None,
page: int = 1,
page_size: int = 10
) -> dict:
"""Query orders for a user
Args:
user_id: User identifier
account_id: B2B account identifier
order_id: Specific order ID to query (optional)
status: Order status filter (pending, paid, shipped, delivered, cancelled)
date_start: Start date filter (YYYY-MM-DD)
date_end: End date filter (YYYY-MM-DD)
page: Page number (default: 1)
page_size: Items per page (default: 10)
Returns:
List of orders with details
"""
payload = {
"user_id": user_id,
"account_id": account_id,
"page": page,
"page_size": page_size
}
if order_id:
payload["order_id"] = order_id
if status:
payload["status"] = status
if date_start or date_end:
payload["date_range"] = {}
if date_start:
payload["date_range"]["start"] = date_start
if date_end:
payload["date_range"]["end"] = date_end
try:
result = await hyperf.post("/orders/query", json=payload)
return {
"success": True,
"orders": result.get("orders", []),
"pagination": result.get("pagination", {})
}
except Exception as e:
return {
"success": False,
"error": str(e),
"orders": []
}
@register_tool("track_logistics")
@mcp.tool()
async def track_logistics(
order_id: str,
tracking_number: Optional[str] = None
) -> dict:
"""Track order logistics/shipping status
Args:
order_id: Order ID
tracking_number: Tracking number (optional, will be fetched from order if not provided)
Returns:
Logistics tracking information with timeline
"""
try:
params = {}
if tracking_number:
params["tracking_number"] = tracking_number
result = await hyperf.get(f"/orders/{order_id}/logistics", params=params)
return {
"success": True,
"order_id": order_id,
"tracking_number": result.get("tracking_number"),
"courier": result.get("courier"),
"status": result.get("status"),
"estimated_delivery": result.get("estimated_delivery"),
"timeline": result.get("timeline", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("modify_order")
@mcp.tool()
async def modify_order(
order_id: str,
user_id: str,
modifications: dict
) -> dict:
"""Modify an existing order
Args:
order_id: Order ID to modify
user_id: User ID for permission verification
modifications: Changes to apply. Can include:
- shipping_address: {province, city, district, detail, contact, phone}
- items: [{product_id, quantity}] to update quantities
- notes: Order notes/instructions
Returns:
Modified order details and any price changes
"""
try:
result = await hyperf.put(
f"/orders/{order_id}/modify",
json={
"user_id": user_id,
"modifications": modifications
}
)
return {
"success": True,
"order_id": order_id,
"order": result.get("order", {}),
"price_diff": result.get("price_diff", 0),
"message": result.get("message", "Order modified successfully")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("cancel_order")
@mcp.tool()
async def cancel_order(
order_id: str,
user_id: str,
reason: str
) -> dict:
"""Cancel an order
Args:
order_id: Order ID to cancel
user_id: User ID for permission verification
reason: Cancellation reason
Returns:
Cancellation result with refund information
"""
try:
result = await hyperf.post(
f"/orders/{order_id}/cancel",
json={
"user_id": user_id,
"reason": reason
}
)
return {
"success": True,
"order_id": order_id,
"status": "cancelled",
"refund_info": result.get("refund_info", {}),
"message": result.get("message", "Order cancelled successfully")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("get_invoice")
@mcp.tool()
async def get_invoice(
order_id: str,
invoice_type: str = "normal"
) -> dict:
"""Get invoice for an order
Args:
order_id: Order ID
invoice_type: Invoice type ('normal' for regular invoice, 'vat' for VAT invoice)
Returns:
Invoice information and download URL
"""
try:
result = await hyperf.get(
f"/orders/{order_id}/invoice",
params={"type": invoice_type}
)
return {
"success": True,
"order_id": order_id,
"invoice_number": result.get("invoice_number"),
"invoice_type": invoice_type,
"amount": result.get("amount"),
"tax": result.get("tax"),
"invoice_url": result.get("invoice_url"),
"issued_at": result.get("issued_at")
}
except Exception as e:
return {
"success": False,
"error": str(e),
"order_id": order_id
}
@register_tool("get_mall_order")
@mcp.tool()
async def get_mall_order(
order_id: str,
user_token: str = None,
user_id: str = None,
account_id: str = None
) -> dict:
"""Query order from Mall API by order ID
从商城 API 查询订单详情
Args:
order_id: 订单号 (e.g., "202071324")
user_token: 用户 JWT token可选如果提供则使用该 token 进行查询
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
Returns:
订单详情包含订单号状态商品信息金额物流信息等
Order details including order ID, status, items, amount, logistics info, etc.
"""
import logging
logger = logging.getLogger(__name__)
logger.info(
f"get_mall_order called: order_id={order_id}, has_user_token={bool(user_token)}, "
f"token_prefix={user_token[:20] if user_token else None}"
)
try:
# 必须提供 user_token
if not user_token:
logger.error("No user_token provided, user must be logged in")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询订单信息",
"order_id": order_id,
"require_login": True
}
logger.info("Using user token for Mall API request")
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
result = await client.get_order_by_id(order_id)
logger.info(
f"Mall API request successful: order_id={order_id}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else None}"
)
return {
"success": True,
"order": result,
"order_id": order_id
}
except Exception as e:
logger.error(
f"Mall API request failed: order_id={order_id}, error={str(e)}"
)
return {
"success": False,
"error": str(e),
"order_id": order_id
}
finally:
# 关闭客户端
if 'client' in dir() and client:
await client.close()
@register_tool("get_logistics")
@mcp.tool()
async def get_logistics(
order_id: str,
user_token: str = None,
user_id: str = None,
account_id: str = None
) -> dict:
"""Query logistics tracking information from Mall API
Mall API 查询订单物流信息
Args:
order_id: 订单号 (e.g., "201941967")
user_token: 用户 JWT token必需用于身份验证
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
Returns:
物流信息包含快递公司状态预计送达时间物流轨迹等
"""
print(f"[get_logistics] Called with order_id={order_id}, has_user_token={bool(user_token)}")
# 必须提供 user_token
if not user_token:
print("[get_logistics] ERROR: No user_token provided")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询物流信息",
"order_id": order_id,
"require_login": True
}
try:
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
print(f"[get_logistics] Calling Mall API: /mall/api/order/parcel?orderId={order_id}")
result = await client.get(
"/mall/api/order/parcel",
params={"orderId": order_id}
)
print(f"[get_logistics] SUCCESS: result_keys={list(result.keys()) if isinstance(result, dict) else type(result).__name__}")
print(f"[get_logistics] Sample data: {str(result)[:1000]}")
# Mall API 返回结构:{ "total": 1, "data": [{ "trackingCode": "...", "carrier": "...", ... }] }
logistics_list = result.get("data", [])
if logistics_list and len(logistics_list) > 0:
first_logistics = logistics_list[0]
tracking_number = first_logistics.get("trackingCode", "")
carrier = first_logistics.get("carrier", "未知")
# 提取 tracks 数组(物流轨迹)
tracks = first_logistics.get("tracks", [])
timeline = []
if tracks and isinstance(tracks, list):
for track in tracks:
if isinstance(track, dict):
timeline.append({
"id": track.get("id", ""),
"remark": track.get("remark", ""),
"time": track.get("time", track.get("date", "")),
"location": track.get("location", "")
})
print(f"[get_logistics] Extracted: tracking_number={tracking_number}, carrier={carrier}, tracks_count={len(timeline)}")
return {
"success": True,
"order_id": order_id,
"tracking_number": tracking_number,
"courier": carrier,
"tracking_url": first_logistics.get("trackingUrl", ""),
"status": first_logistics.get("status", ""),
"timeline": timeline
}
else:
print(f"[get_logistics] WARNING: No logistics data found in response")
return {
"success": True,
"order_id": order_id,
"tracking_number": "",
"courier": "暂无物流信息",
"status": "",
"timeline": []
}
except Exception as e:
import traceback
print(f"[get_logistics] ERROR: {type(e).__name__}: {str(e)}")
print(f"[get_logistics] TRACEBACK:\n{traceback.format_exc()}")
return {
"success": False,
"error": str(e),
"order_id": order_id
}
finally:
if 'client' in dir() and client:
await client.close()
@register_tool("get_mall_order_list")
@mcp.tool()
async def get_mall_order_list(
user_token: str = None,
user_id: str = None,
account_id: str = None,
page: int = 1,
limit: int = 5,
customer_id: int = 0,
order_types: Optional[List[int]] = None,
shipping_status: int = 10000,
date_added: Optional[str] = None,
date_end: Optional[str] = None,
no: Optional[str] = None,
status: int = 10000,
is_drop_shopping: int = 0
) -> dict:
"""Query order list from Mall API with filters
Mall API 查询订单列表支持多种筛选条件
规则
- 默认返回最近 5 个订单
- 包含全部状态的订单不限制状态
Args:
user_token: 用户 JWT token必需用于身份验证
user_id: 用户 ID自动注入此工具不使用
account_id: 账户 ID自动注入此工具不使用
page: 页码 (default: 1)
limit: 每页数量 (default: 5, max 50)
customer_id: 客户ID (default: 0, 0表示所有客户)
order_types: 订单类型数组 [1, 2] (default: None)
shipping_status: 物流状态 (default: 10000, 10000表示全部状态)
date_added: 开始日期格式 YYYY-MM-DD (default: None)
date_end: 结束日期格式 YYYY-MM-DD (default: None)
no: 订单号筛选 (default: None)
status: 订单状态筛选 (default: 10000, 10000表示全部状态)
is_drop_shopping: 是否代发货 (default: 0)
Returns:
订单列表和分页信息
{
"success": true,
"orders": [...], # 订单列表
"total": 100, # 总订单数
"page": 1, # 当前页
"limit": 5, # 每页数量
"total_pages": 20 # 总页数
}
Example:
用户问: "我的订单有哪些?"
Agent 调用: get_mall_order_list(page=1, limit=5)
"""
import logging
import json
import base64
logger = logging.getLogger(__name__)
logger.info(
f"get_mall_order_list called: page={page}, limit={limit}, "
f"has_user_token={bool(user_token)}, customer_id={customer_id}"
)
# 必须提供 user_token
if not user_token:
logger.error("No user_token provided, user must be logged in")
return {
"success": False,
"error": "用户未登录,请先登录账户以查询订单列表",
"require_login": True,
"orders": []
}
# 从 JWT token 中提取 userId 作为 customer_id
if customer_id == 0:
try:
# JWT token 格式: header.payload.signature
parts = user_token.split('.')
if len(parts) >= 2:
# 解码 payload
payload = parts[1]
# 添加 padding 如果需要
payload += '=' * (4 - len(payload) % 4)
decoded = base64.b64decode(payload)
payload_data = json.loads(decoded)
# 尝试从不同字段获取 userId
customer_id = payload_data.get('userId') or payload_data.get('uid') or payload_data.get('sub') or 0
logger.info(
f"Extracted customer_id from token: customer_id={customer_id}, "
f"token_payload_keys={list(payload_data.keys())}"
)
except Exception as e:
logger.warning(f"Failed to extract customer_id from token: {e}")
customer_id = 0
try:
client = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
result = await client.get_order_list(
page=page,
limit=limit,
customer_id=customer_id,
order_types=order_types,
shipping_status=shipping_status,
date_added=date_added,
date_end=date_end,
no=no,
status=status,
is_drop_shopping=is_drop_shopping
)
logger.info(
f"Mall API request successful: page={page}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else None}"
)
# Mall API 返回结构: {"data": [...], "total": 100}
orders = result.get("data", [])
total = result.get("total", 0)
return {
"success": True,
"orders": orders,
"total": total,
"page": page,
"limit": limit,
"total_pages": (total + limit - 1) // limit if limit > 0 else 0
}
except Exception as e:
logger.error(
f"Mall API request failed: page={page}, error={str(e)}"
)
return {
"success": False,
"error": str(e),
"orders": []
}
finally:
if 'client' in dir() and client:
await client.close()
# Health check endpoint
@register_tool("health_check")
@mcp.tool()
async def health_check() -> dict:
"""Check server health status"""
return {
"status": "healthy",
"service": "order_mcp",
"version": "1.0.0"
}
if __name__ == "__main__":
import uvicorn
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
# Health check endpoint
async def health_check(request):
return JSONResponse({"status": "healthy"})
# Tool execution endpoint
async def execute_tool(request: Request):
"""Execute an MCP tool via HTTP"""
tool_name = request.path_params["tool_name"]
try:
# Get arguments from request body
arguments = await request.json()
# Get tool function from registry
if tool_name not in _tools:
return JSONResponse({
"success": False,
"error": f"Tool '{tool_name}' not found"
}, status_code=404)
tool_obj = _tools[tool_name]
# Filter arguments to only include parameters expected by the tool
# Get parameter names from tool's parameters schema
tool_params = tool_obj.parameters.get('properties', {})
filtered_args = {k: v for k, v in arguments.items() if k in tool_params}
# Call the tool with filtered arguments
# FastMCP FunctionTool.run() takes a dict of arguments
tool_result = await tool_obj.run(filtered_args)
# Extract content from ToolResult
# ToolResult.content is a list of TextContent objects with a 'text' attribute
if tool_result.content and len(tool_result.content) > 0:
content = tool_result.content[0].text
# Try to parse as JSON if possible
try:
import json
result = json.loads(content)
except:
result = content
else:
result = None
return JSONResponse({
"success": True,
"result": result
})
except TypeError as e:
return JSONResponse({
"success": False,
"error": f"Invalid arguments: {str(e)}"
}, status_code=400)
except Exception as e:
return JSONResponse({
"success": False,
"error": str(e)
}, status_code=500)
# Create routes list
routes = [
Route('/health', health_check, methods=['GET']),
Route('/tools/{tool_name}', execute_tool, methods=['POST'])
]
# Create app from MCP with custom routes
app = mcp.http_app()
# Add our custom routes to the existing app
for route in routes:
app.router.routes.append(route)
uvicorn.run(app, host="0.0.0.0", port=8002)