""" Order MCP Server - Order management tools """ import sys import os from typing import Optional, List # Add shared module to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastmcp import FastMCP from pydantic_settings import BaseSettings from pydantic import ConfigDict class Settings(BaseSettings): """Server configuration""" hyperf_api_url: str hyperf_api_token: str log_level: str = "INFO" model_config = ConfigDict(env_file=".env") settings = Settings() # Create MCP server mcp = FastMCP( "Order Management" ) # Hyperf client for this server from shared.hyperf_client import HyperfClient hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token) @mcp.tool() async def query_order( user_id: str, account_id: str, order_id: Optional[str] = None, status: Optional[str] = None, date_start: Optional[str] = None, date_end: Optional[str] = None, page: int = 1, page_size: int = 10 ) -> dict: """Query orders for a user Args: user_id: User identifier account_id: B2B account identifier order_id: Specific order ID to query (optional) status: Order status filter (pending, paid, shipped, delivered, cancelled) date_start: Start date filter (YYYY-MM-DD) date_end: End date filter (YYYY-MM-DD) page: Page number (default: 1) page_size: Items per page (default: 10) Returns: List of orders with details """ payload = { "user_id": user_id, "account_id": account_id, "page": page, "page_size": page_size } if order_id: payload["order_id"] = order_id if status: payload["status"] = status if date_start or date_end: payload["date_range"] = {} if date_start: payload["date_range"]["start"] = date_start if date_end: payload["date_range"]["end"] = date_end try: result = await hyperf.post("/orders/query", json=payload) return { "success": True, "orders": result.get("orders", []), "pagination": result.get("pagination", {}) } except Exception as e: return { "success": False, "error": str(e), "orders": [] } @mcp.tool() async def track_logistics( order_id: str, tracking_number: Optional[str] = None ) -> dict: """Track order logistics/shipping status Args: order_id: Order ID tracking_number: Tracking number (optional, will be fetched from order if not provided) Returns: Logistics tracking information with timeline """ try: params = {} if tracking_number: params["tracking_number"] = tracking_number result = await hyperf.get(f"/orders/{order_id}/logistics", params=params) return { "success": True, "order_id": order_id, "tracking_number": result.get("tracking_number"), "courier": result.get("courier"), "status": result.get("status"), "estimated_delivery": result.get("estimated_delivery"), "timeline": result.get("timeline", []) } except Exception as e: return { "success": False, "error": str(e), "order_id": order_id } @mcp.tool() async def modify_order( order_id: str, user_id: str, modifications: dict ) -> dict: """Modify an existing order Args: order_id: Order ID to modify user_id: User ID for permission verification modifications: Changes to apply. Can include: - shipping_address: {province, city, district, detail, contact, phone} - items: [{product_id, quantity}] to update quantities - notes: Order notes/instructions Returns: Modified order details and any price changes """ try: result = await hyperf.put( f"/orders/{order_id}/modify", json={ "user_id": user_id, "modifications": modifications } ) return { "success": True, "order_id": order_id, "order": result.get("order", {}), "price_diff": result.get("price_diff", 0), "message": result.get("message", "Order modified successfully") } except Exception as e: return { "success": False, "error": str(e), "order_id": order_id } @mcp.tool() async def cancel_order( order_id: str, user_id: str, reason: str ) -> dict: """Cancel an order Args: order_id: Order ID to cancel user_id: User ID for permission verification reason: Cancellation reason Returns: Cancellation result with refund information """ try: result = await hyperf.post( f"/orders/{order_id}/cancel", json={ "user_id": user_id, "reason": reason } ) return { "success": True, "order_id": order_id, "status": "cancelled", "refund_info": result.get("refund_info", {}), "message": result.get("message", "Order cancelled successfully") } except Exception as e: return { "success": False, "error": str(e), "order_id": order_id } @mcp.tool() async def get_invoice( order_id: str, invoice_type: str = "normal" ) -> dict: """Get invoice for an order Args: order_id: Order ID invoice_type: Invoice type ('normal' for regular invoice, 'vat' for VAT invoice) Returns: Invoice information and download URL """ try: result = await hyperf.get( f"/orders/{order_id}/invoice", params={"type": invoice_type} ) return { "success": True, "order_id": order_id, "invoice_number": result.get("invoice_number"), "invoice_type": invoice_type, "amount": result.get("amount"), "tax": result.get("tax"), "invoice_url": result.get("invoice_url"), "issued_at": result.get("issued_at") } except Exception as e: return { "success": False, "error": str(e), "order_id": order_id } # Health check endpoint @mcp.tool() async def health_check() -> dict: """Check server health status""" return { "status": "healthy", "service": "order_mcp", "version": "1.0.0" } if __name__ == "__main__": import uvicorn # Create FastAPI app from MCP app = mcp.http_app() # Add health endpoint from starlette.responses import JSONResponse async def health_check(request): return JSONResponse({"status": "healthy"}) # Add the route to the app from starlette.routing import Route app.router.routes.append(Route('/health', health_check, methods=['GET'])) uvicorn.run(app, host="0.0.0.0", port=8002)