""" Product MCP Server - Product search, recommendations, and quotes """ import sys import os from typing import Optional, List, Dict, Any # Add shared module to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastmcp import FastMCP from pydantic_settings import BaseSettings from pydantic import ConfigDict class Settings(BaseSettings): """Server configuration""" hyperf_api_url: str hyperf_api_token: str mall_api_url: str mall_tenant_id: str = "2" mall_currency_code: str = "EUR" mall_language_id: str = "1" mall_source: str = "us.qa1.gaia888.com" log_level: str = "INFO" model_config = ConfigDict(env_file=".env") settings = Settings() # Create MCP server mcp = FastMCP( "Product Service" ) # Tool registry for HTTP access _tools: Dict[str, Any] = {} def register_tool(name: str): """Decorator to register tool in _tools dict""" def decorator(func): _tools[name] = func return func return decorator # Hyperf client for this server from shared.hyperf_client import HyperfClient hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token) @register_tool("get_product_detail") @mcp.tool() async def get_product_detail( product_id: str ) -> dict: """Get product details Args: product_id: Product ID Returns: Detailed product information including specifications, pricing, and stock """ try: result = await hyperf.get(f"/products/{product_id}") return { "success": True, "product": result } except Exception as e: return { "success": False, "error": str(e), "product": None } @register_tool("recommend_products") @mcp.tool() async def recommend_products( user_id: str, account_id: str, context: Optional[dict] = None, strategy: str = "hybrid", limit: int = 10 ) -> dict: """Get personalized product recommendations Args: user_id: User identifier account_id: B2B account identifier context: Optional context for recommendations: - current_query: Current search query - recent_views: List of recently viewed product IDs - cart_items: Items in cart strategy: Recommendation strategy (collaborative, content_based, hybrid) limit: Maximum recommendations to return (default: 10) Returns: List of recommended products with reasons """ payload = { "user_id": user_id, "account_id": account_id, "strategy": strategy, "limit": limit } if context: payload["context"] = context try: result = await hyperf.post("/products/recommend", json=payload) return { "success": True, "recommendations": result.get("recommendations", []) } except Exception as e: return { "success": False, "error": str(e), "recommendations": [] } @register_tool("get_quote") @mcp.tool() async def get_quote( product_id: str, quantity: int, account_id: str, delivery_province: Optional[str] = None, delivery_city: Optional[str] = None ) -> dict: """Get B2B price quote Args: product_id: Product ID quantity: Desired quantity account_id: B2B account ID (for customer-specific pricing) delivery_province: Delivery province (for shipping calculation) delivery_city: Delivery city (for shipping calculation) Returns: Detailed quote with unit price, discounts, tax, and shipping """ payload = { "product_id": product_id, "quantity": quantity, "account_id": account_id } if delivery_province or delivery_city: payload["delivery_address"] = {} if delivery_province: payload["delivery_address"]["province"] = delivery_province if delivery_city: payload["delivery_address"]["city"] = delivery_city try: result = await hyperf.post("/products/quote", json=payload) return { "success": True, "quote_id": result.get("quote_id"), "product_id": product_id, "quantity": quantity, "unit_price": result.get("unit_price"), "subtotal": result.get("subtotal"), "discount": result.get("discount", 0), "discount_reason": result.get("discount_reason"), "tax": result.get("tax"), "shipping_fee": result.get("shipping_fee"), "total_price": result.get("total_price"), "validity": result.get("validity"), "payment_terms": result.get("payment_terms"), "estimated_delivery": result.get("estimated_delivery") } except Exception as e: return { "success": False, "error": str(e) } @register_tool("check_inventory") @mcp.tool() async def check_inventory( product_ids: List[str], warehouse: Optional[str] = None ) -> dict: """Check product inventory/stock Args: product_ids: List of product IDs to check warehouse: Specific warehouse to check (optional) Returns: Inventory status for each product """ payload = {"product_ids": product_ids} if warehouse: payload["warehouse"] = warehouse try: result = await hyperf.post("/products/inventory/check", json=payload) return { "success": True, "inventory": result.get("inventory", []) } except Exception as e: return { "success": False, "error": str(e), "inventory": [] } @register_tool("get_categories") @mcp.tool() async def get_categories() -> dict: """Get product category tree Returns: Hierarchical category structure """ try: result = await hyperf.get("/products/categories") return { "success": True, "categories": result.get("categories", []) } except Exception as e: return { "success": False, "error": str(e), "categories": [] } @register_tool("search_products") @mcp.tool() async def search_products( keyword: str, page_size: int = 60, page: int = 1, user_token: str = None, user_id: str = None, account_id: str = None ) -> dict: """Search products from Mall API 从 Mall API 搜索商品 SPU(根据关键词) Args: keyword: 搜索关键词(商品名称、编号等) page_size: 每页数量 (default: 60, max 100) page: 页码 (default: 1) user_token: 用户 JWT token(必需,用于 Mall API 认证) user_id: 用户 ID(自动注入) account_id: 账户 ID(自动注入) Returns: 商品列表,包含 SPU 信息、商品图片、价格等 Product list including SPU ID, name, image, price, etc. """ if not user_token: return { "success": False, "error": "用户未登录,请先登录账户以搜索商品", "products": [], "total": 0, "require_login": True } try: from shared.mall_client import MallClient import logging logger = logging.getLogger(__name__) logger.info( f"search_products called with keyword={keyword}, " f"user_token_prefix={user_token[:20] if user_token else None}..." ) print(f"[DEBUG] search_products called: keyword={keyword}, user_token={user_token[:20] if user_token else None}...") # 使用用户 token 创建 Mall 客户端 mall = MallClient( api_url=settings.mall_api_url, api_token=user_token, tenant_id=settings.mall_tenant_id, currency_code=settings.mall_currency_code, language_id=settings.mall_language_id, source=settings.mall_source ) result = await mall.search_spu_products( keyword=keyword, page_size=page_size, page=page ) logger.info( f"Mall API returned: result_type={type(result).__name__}, " f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, " f"result={result}" ) print(f"[DEBUG] Mall API returned: {result}") # 解析返回结果 # Mall API 返回结构: {"total": X, "data": {"data": [...], ...}} if "data" in result and isinstance(result["data"], dict): products = result["data"].get("data", []) else: products = result.get("list", []) total = result.get("total", 0) # 格式化商品数据 formatted_products = [] for product in products: formatted_products.append({ "spu_id": product.get("spuId"), "spu_sn": product.get("spuSn"), "product_name": product.get("productName"), "product_image": product.get("productImage"), "price": product.get("price"), "special_price": product.get("specialPrice"), "stock": product.get("stock"), "sales_count": product.get("salesCount", 0) }) return { "success": True, "products": formatted_products, "total": total, "keyword": keyword } except Exception as e: return { "success": False, "error": str(e), "products": [], "total": 0 } finally: # 关闭客户端 if 'client' in dir() and 'mall' in dir(): await mall.close() # Health check endpoint @register_tool("health_check") @mcp.tool() async def health_check() -> dict: """Check server health status""" return { "status": "healthy", "service": "product_mcp", "version": "1.0.0" } if __name__ == "__main__": import uvicorn from starlette.responses import JSONResponse from starlette.routing import Route from starlette.requests import Request # Custom tool execution endpoint async def execute_tool(request: Request): """Execute an MCP tool via HTTP""" tool_name = request.path_params["tool_name"] try: # Get arguments from request body arguments = await request.json() # Get tool function from registry if tool_name not in _tools: return JSONResponse({ "success": False, "error": f"Tool '{tool_name}' not found" }, status_code=404) tool_obj = _tools[tool_name] # Call the tool with arguments # FastMCP FunctionTool.run() takes a dict of arguments tool_result = await tool_obj.run(arguments) # Extract content from ToolResult # ToolResult.content is a list of TextContent objects with a 'text' attribute if tool_result.content and len(tool_result.content) > 0: content = tool_result.content[0].text # Try to parse as JSON if possible try: import json result = json.loads(content) except: result = content else: result = None return JSONResponse({ "success": True, "result": result }) except TypeError as e: return JSONResponse({ "success": False, "error": f"Invalid arguments: {str(e)}" }, status_code=400) except Exception as e: return JSONResponse({ "success": False, "error": str(e) }, status_code=500) # Health check endpoint async def health_check(request): return JSONResponse({"status": "healthy"}) # Create routes list routes = [ Route('/health', health_check, methods=['GET']), Route('/tools/{tool_name}', execute_tool, methods=['POST']) ] # Create app from MCP with custom routes app = mcp.http_app() # Add our custom routes to the existing app for route in routes: app.router.routes.append(route) uvicorn.run(app, host="0.0.0.0", port=8004)