""" Order MCP Server - Order management tools """ import sys import os from typing import Optional, List # Add shared module to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastmcp import FastMCP from pydantic_settings import BaseSettings from pydantic import ConfigDict class Settings(BaseSettings): """Server configuration""" hyperf_api_url: str hyperf_api_token: str # Mall API 配置 mall_api_url: str = "https://apicn.qa1.gaia888.com" mall_api_token: str = "" mall_tenant_id: str = "2" mall_currency_code: str = "EUR" mall_language_id: str = "1" mall_source: str = "us.qa1.gaia888.com" log_level: str = "INFO" model_config = ConfigDict(env_file=".env") settings = Settings() # Create MCP server mcp = FastMCP( "Order Management" ) # Tool registry for HTTP access _tools = {} # Hyperf client for this server from shared.hyperf_client import HyperfClient hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token) # Mall API client from shared.mall_client import MallClient mall = MallClient( api_url=getattr(settings, 'mall_api_url', 'https://apicn.qa1.gaia888.com'), api_token=getattr(settings, 'mall_api_token', ''), tenant_id=getattr(settings, 'mall_tenant_id', '2'), currency_code=getattr(settings, 'mall_currency_code', 'EUR'), language_id=getattr(settings, 'mall_language_id', '1'), source=getattr(settings, 'mall_source', 'us.qa1.gaia888.com') ) def register_tool(name: str): """Register a tool for HTTP access""" def decorator(func): _tools[name] = func return func return decorator @register_tool("query_order") @mcp.tool() async def query_order( user_id: str, account_id: str, order_id: Optional[str] = None, status: Optional[str] = None, date_start: Optional[str] = None, date_end: Optional[str] = None, page: int = 1, page_size: int = 10 ) -> dict: """Query orders for a user Args: user_id: User identifier account_id: B2B account identifier order_id: Specific order ID to query (optional) status: Order status filter (pending, paid, shipped, delivered, cancelled) date_start: Start date filter (YYYY-MM-DD) date_end: End date filter (YYYY-MM-DD) page: Page number (default: 1) page_size: Items per page (default: 10) Returns: List of orders with details """ payload = { "user_id": user_id, "account_id": account_id, "page": page, "page_size": page_size } if order_id: payload["order_id"] = order_id if status: payload["status"] = status if date_start or date_end: payload["date_range"] = {} if date_start: payload["date_range"]["start"] = date_start if date_end: payload["date_range"]["end"] = date_end try: result = await hyperf.post("/orders/query", json=payload) return { "success": True, "orders": result.get("orders", []), "pagination": result.get("pagination", {}) } except Exception as e: return { "success": False, "error": str(e), "orders": [] } @register_tool("track_logistics") @mcp.tool() async def track_logistics( order_id: str, tracking_number: Optional[str] = None ) -> dict: """Track order logistics/shipping status Args: order_id: Order ID tracking_number: Tracking number (optional, will be fetched from order if not provided) Returns: Logistics tracking information with timeline """ try: params = {} if tracking_number: params["tracking_number"] = tracking_number result = await hyperf.get(f"/orders/{order_id}/logistics", params=params) return { "success": True, "order_id": order_id, "tracking_number": result.get("tracking_number"), "courier": result.get("courier"), "status": result.get("status"), "estimated_delivery": result.get("estimated_delivery"), "timeline": result.get("timeline", []) } except Exception as e: return { "success": False, "error": str(e), "order_id": order_id } @register_tool("modify_order") @mcp.tool() async def modify_order( order_id: str, user_id: str, modifications: dict ) -> dict: """Modify an existing order Args: order_id: Order ID to modify user_id: User ID for permission verification modifications: Changes to apply. Can include: - shipping_address: {province, city, district, detail, contact, phone} - items: [{product_id, quantity}] to update quantities - notes: Order notes/instructions Returns: Modified order details and any price changes """ try: result = await hyperf.put( f"/orders/{order_id}/modify", json={ "user_id": user_id, "modifications": modifications } ) return { "success": True, "order_id": order_id, "order": result.get("order", {}), "price_diff": result.get("price_diff", 0), "message": result.get("message", "Order modified successfully") } except Exception as e: return { "success": False, "error": str(e), "order_id": order_id } @register_tool("cancel_order") @mcp.tool() async def cancel_order( order_id: str, user_id: str, reason: str ) -> dict: """Cancel an order Args: order_id: Order ID to cancel user_id: User ID for permission verification reason: Cancellation reason Returns: Cancellation result with refund information """ try: result = await hyperf.post( f"/orders/{order_id}/cancel", json={ "user_id": user_id, "reason": reason } ) return { "success": True, "order_id": order_id, "status": "cancelled", "refund_info": result.get("refund_info", {}), "message": result.get("message", "Order cancelled successfully") } except Exception as e: return { "success": False, "error": str(e), "order_id": order_id } @register_tool("get_invoice") @mcp.tool() async def get_invoice( order_id: str, invoice_type: str = "normal" ) -> dict: """Get invoice for an order Args: order_id: Order ID invoice_type: Invoice type ('normal' for regular invoice, 'vat' for VAT invoice) Returns: Invoice information and download URL """ try: result = await hyperf.get( f"/orders/{order_id}/invoice", params={"type": invoice_type} ) return { "success": True, "order_id": order_id, "invoice_number": result.get("invoice_number"), "invoice_type": invoice_type, "amount": result.get("amount"), "tax": result.get("tax"), "invoice_url": result.get("invoice_url"), "issued_at": result.get("issued_at") } except Exception as e: return { "success": False, "error": str(e), "order_id": order_id } @register_tool("get_mall_order") @mcp.tool() async def get_mall_order( order_id: str, user_token: str = None, user_id: str = None, account_id: str = None ) -> dict: """Query order from Mall API by order ID 从商城 API 查询订单详情 Args: order_id: 订单号 (e.g., "202071324") user_token: 用户 JWT token(可选,如果提供则使用该 token 进行查询) user_id: 用户 ID(自动注入,此工具不使用) account_id: 账户 ID(自动注入,此工具不使用) Returns: 订单详情,包含订单号、状态、商品信息、金额、物流信息等 Order details including order ID, status, items, amount, logistics info, etc. """ import logging logger = logging.getLogger(__name__) logger.info( "get_mall_order called", order_id=order_id, has_user_token=bool(user_token), user_token_prefix=user_token[:20] if user_token else None ) try: # 必须提供 user_token if not user_token: logger.error("No user_token provided, user must be logged in") return { "success": False, "error": "用户未登录,请先登录账户以查询订单信息", "order_id": order_id, "require_login": True } logger.info("Using user token for Mall API request") client = MallClient( api_url=settings.mall_api_url, api_token=user_token, tenant_id=settings.mall_tenant_id, currency_code=settings.mall_currency_code, language_id=settings.mall_language_id, source=settings.mall_source ) result = await client.get_order_by_id(order_id) logger.info( "Mall API request successful", order_id=order_id, result_keys=list(result.keys()) if isinstance(result, dict) else None ) return { "success": True, "order": result, "order_id": order_id } except Exception as e: logger.error( "Mall API request failed", order_id=order_id, error=str(e) ) return { "success": False, "error": str(e), "order_id": order_id } finally: # 关闭客户端 if 'client' in dir() and client: await client.close() @register_tool("get_logistics") @mcp.tool() async def get_logistics( order_id: str, user_token: str = None, user_id: str = None, account_id: str = None ) -> dict: """Query logistics tracking information from Mall API 从 Mall API 查询订单物流信息 Args: order_id: 订单号 (e.g., "201941967") user_token: 用户 JWT token(必需,用于身份验证) user_id: 用户 ID(自动注入,此工具不使用) account_id: 账户 ID(自动注入,此工具不使用) Returns: 物流信息,包含快递公司、状态、预计送达时间、物流轨迹等 """ import logging logger = logging.getLogger(__name__) logger.info( "get_logistics called", order_id=order_id, has_user_token=bool(user_token) ) # 必须提供 user_token if not user_token: logger.error("No user_token provided for logistics query") return { "success": False, "error": "用户未登录,请先登录账户以查询物流信息", "order_id": order_id, "require_login": True } try: client = MallClient( api_url=settings.mall_api_url, api_token=user_token, tenant_id=settings.mall_tenant_id, currency_code=settings.mall_currency_code, language_id=settings.mall_language_id, source=settings.mall_source ) result = await client.get( "/mall/api/order/parcel", params={"orderId": order_id} ) logger.info( "Logistics query successful", order_id=order_id, has_tracking=bool(result.get("trackingNumber")) ) return { "success": True, "order_id": order_id, "tracking_number": result.get("trackingNumber"), "courier": result.get("courier"), "status": result.get("status"), "estimated_delivery": result.get("estimatedDelivery"), "timeline": result.get("timeline", []) } except Exception as e: logger.error( "Logistics query failed", order_id=order_id, error=str(e) ) return { "success": False, "error": str(e), "order_id": order_id } finally: if 'client' in dir() and client: await client.close() # Health check endpoint @register_tool("health_check") @mcp.tool() async def health_check() -> dict: """Check server health status""" return { "status": "healthy", "service": "order_mcp", "version": "1.0.0" } if __name__ == "__main__": import uvicorn from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route # Health check endpoint async def health_check(request): return JSONResponse({"status": "healthy"}) # Tool execution endpoint async def execute_tool(request: Request): """Execute an MCP tool via HTTP""" tool_name = request.path_params["tool_name"] try: # Get arguments from request body arguments = await request.json() # Get tool function from registry if tool_name not in _tools: return JSONResponse({ "success": False, "error": f"Tool '{tool_name}' not found" }, status_code=404) tool_obj = _tools[tool_name] # Call the tool with arguments # FastMCP FunctionTool.run() takes a dict of arguments tool_result = await tool_obj.run(arguments) # Extract content from ToolResult # ToolResult.content is a list of TextContent objects with a 'text' attribute if tool_result.content and len(tool_result.content) > 0: content = tool_result.content[0].text # Try to parse as JSON if possible try: import json result = json.loads(content) except: result = content else: result = None return JSONResponse({ "success": True, "result": result }) except TypeError as e: return JSONResponse({ "success": False, "error": f"Invalid arguments: {str(e)}" }, status_code=400) except Exception as e: return JSONResponse({ "success": False, "error": str(e) }, status_code=500) # Create routes list routes = [ Route('/health', health_check, methods=['GET']), Route('/tools/{tool_name}', execute_tool, methods=['POST']) ] # Create app from MCP with custom routes app = mcp.http_app() # Add our custom routes to the existing app for route in routes: app.router.routes.append(route) uvicorn.run(app, host="0.0.0.0", port=8002)