diff --git a/mcp_servers/product_mcp/server.py b/mcp_servers/product_mcp/server.py index d36a6ad..866cd5a 100644 --- a/mcp_servers/product_mcp/server.py +++ b/mcp_servers/product_mcp/server.py @@ -3,15 +3,13 @@ Product MCP Server - Product search, recommendations, and quotes """ import sys import os -from typing import Optional, List +from typing import Optional, List, Dict, Any # Add shared module to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastmcp import FastMCP from pydantic_settings import BaseSettings - - from pydantic import ConfigDict @@ -20,7 +18,7 @@ class Settings(BaseSettings): hyperf_api_url: str hyperf_api_token: str log_level: str = "INFO" - + model_config = ConfigDict(env_file=".env") @@ -31,12 +29,24 @@ mcp = FastMCP( "Product Service" ) +# Tool registry for HTTP access +_tools: Dict[str, Any] = {} + + +def register_tool(name: str): + """Decorator to register tool in _tools dict""" + def decorator(func): + _tools[name] = func + return func + return decorator + # Hyperf client for this server from shared.hyperf_client import HyperfClient hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token) +@register_tool("get_product_detail") @mcp.tool() async def get_product_detail( product_id: str @@ -64,6 +74,7 @@ async def get_product_detail( } +@register_tool("recommend_products") @mcp.tool() async def recommend_products( user_id: str, @@ -112,6 +123,7 @@ async def recommend_products( } +@register_tool("get_quote") @mcp.tool() async def get_quote( product_id: str, @@ -171,6 +183,7 @@ async def get_quote( } +@register_tool("check_inventory") @mcp.tool() async def check_inventory( product_ids: List[str], @@ -204,6 +217,7 @@ async def check_inventory( } +@register_tool("get_categories") @mcp.tool() async def get_categories() -> dict: """Get product category tree @@ -226,6 +240,7 @@ async def get_categories() -> dict: } +@register_tool("search_products") @mcp.tool() async def search_products( keyword: str, @@ -317,6 +332,7 @@ async def search_products( # Health check endpoint +@register_tool("health_check") @mcp.tool() async def health_check() -> dict: """Check server health status""" @@ -329,17 +345,76 @@ async def health_check() -> dict: if __name__ == "__main__": import uvicorn - - # Create FastAPI app from MCP - app = mcp.http_app() - - # Add health endpoint from starlette.responses import JSONResponse + from starlette.routing import Route + from starlette.requests import Request + + # Custom tool execution endpoint + async def execute_tool(request: Request): + """Execute an MCP tool via HTTP""" + tool_name = request.path_params["tool_name"] + + try: + # Get arguments from request body + arguments = await request.json() + + # Get tool function from registry + if tool_name not in _tools: + return JSONResponse({ + "success": False, + "error": f"Tool '{tool_name}' not found" + }, status_code=404) + + tool_obj = _tools[tool_name] + + # Call the tool with arguments + # FastMCP FunctionTool.run() takes a dict of arguments + tool_result = await tool_obj.run(arguments) + + # Extract content from ToolResult + # ToolResult.content is a list of TextContent objects with a 'text' attribute + if tool_result.content and len(tool_result.content) > 0: + content = tool_result.content[0].text + # Try to parse as JSON if possible + try: + import json + result = json.loads(content) + except: + result = content + else: + result = None + + return JSONResponse({ + "success": True, + "result": result + }) + + except TypeError as e: + return JSONResponse({ + "success": False, + "error": f"Invalid arguments: {str(e)}" + }, status_code=400) + except Exception as e: + return JSONResponse({ + "success": False, + "error": str(e) + }, status_code=500) + + # Health check endpoint async def health_check(request): return JSONResponse({"status": "healthy"}) - - # Add the route to the app - from starlette.routing import Route - app.router.routes.append(Route('/health', health_check, methods=['GET'])) - + + # Create routes list + routes = [ + Route('/health', health_check, methods=['GET']), + Route('/tools/{tool_name}', execute_tool, methods=['POST']) + ] + + # Create app from MCP with custom routes + app = mcp.http_app() + + # Add our custom routes to the existing app + for route in routes: + app.router.routes.append(route) + uvicorn.run(app, host="0.0.0.0", port=8004)