Files
assistant/mcp_servers/product_mcp/server.py
wangliang ad7f30d54c fix: 删除旧的 search_products 工具,解决工具名冲突
## 问题
Product MCP 启动时出现警告:
```
WARNING Tool already exists: search_products
```

导致工具调用时返回 404 错误:
```
POST /tools/search_products HTTP/1.1 404 Not Found
```

## 根本原因
Product MCP 中有两个同名工具:
1. **第 40-99 行**:旧的 `search_products`(使用 Hyperf API)
2. **第 292-378 行**:新的 `search_products`(使用 Mall API)

FastMCP 无法注册同名工具,导致注册失败。

## 解决方案
删除旧的 `search_products` 工具定义(第 40-99 行),保留新的使用 Mall API 的版本。

## 修改内容
**文件**: mcp_servers/product_mcp/server.py
- 删除第 40-99 行(旧的 search_products 工具)
- 保留第 291 行开始的新的 search_products 工具

## 影响
- 移除了基于 Hyperf API 的旧搜索功能
- 所有商品搜索统一使用 Mall API
- 不再支持复杂过滤条件(category, brand, price_range 等)
- 简化为关键词搜索,返回商品卡片格式

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-26 18:25:06 +08:00

346 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Product MCP Server - Product search, recommendations, and quotes
"""
import sys
import os
from typing import Optional, List
# Add shared module to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class Settings(BaseSettings):
"""Server configuration"""
hyperf_api_url: str
hyperf_api_token: str
log_level: str = "INFO"
model_config = ConfigDict(env_file=".env")
settings = Settings()
# Create MCP server
mcp = FastMCP(
"Product Service"
)
# Hyperf client for this server
from shared.hyperf_client import HyperfClient
hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token)
@mcp.tool()
async def get_product_detail(
product_id: str
) -> dict:
"""Get product details
Args:
product_id: Product ID
Returns:
Detailed product information including specifications, pricing, and stock
"""
try:
result = await hyperf.get(f"/products/{product_id}")
return {
"success": True,
"product": result
}
except Exception as e:
return {
"success": False,
"error": str(e),
"product": None
}
@mcp.tool()
async def recommend_products(
user_id: str,
account_id: str,
context: Optional[dict] = None,
strategy: str = "hybrid",
limit: int = 10
) -> dict:
"""Get personalized product recommendations
Args:
user_id: User identifier
account_id: B2B account identifier
context: Optional context for recommendations:
- current_query: Current search query
- recent_views: List of recently viewed product IDs
- cart_items: Items in cart
strategy: Recommendation strategy (collaborative, content_based, hybrid)
limit: Maximum recommendations to return (default: 10)
Returns:
List of recommended products with reasons
"""
payload = {
"user_id": user_id,
"account_id": account_id,
"strategy": strategy,
"limit": limit
}
if context:
payload["context"] = context
try:
result = await hyperf.post("/products/recommend", json=payload)
return {
"success": True,
"recommendations": result.get("recommendations", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"recommendations": []
}
@mcp.tool()
async def get_quote(
product_id: str,
quantity: int,
account_id: str,
delivery_province: Optional[str] = None,
delivery_city: Optional[str] = None
) -> dict:
"""Get B2B price quote
Args:
product_id: Product ID
quantity: Desired quantity
account_id: B2B account ID (for customer-specific pricing)
delivery_province: Delivery province (for shipping calculation)
delivery_city: Delivery city (for shipping calculation)
Returns:
Detailed quote with unit price, discounts, tax, and shipping
"""
payload = {
"product_id": product_id,
"quantity": quantity,
"account_id": account_id
}
if delivery_province or delivery_city:
payload["delivery_address"] = {}
if delivery_province:
payload["delivery_address"]["province"] = delivery_province
if delivery_city:
payload["delivery_address"]["city"] = delivery_city
try:
result = await hyperf.post("/products/quote", json=payload)
return {
"success": True,
"quote_id": result.get("quote_id"),
"product_id": product_id,
"quantity": quantity,
"unit_price": result.get("unit_price"),
"subtotal": result.get("subtotal"),
"discount": result.get("discount", 0),
"discount_reason": result.get("discount_reason"),
"tax": result.get("tax"),
"shipping_fee": result.get("shipping_fee"),
"total_price": result.get("total_price"),
"validity": result.get("validity"),
"payment_terms": result.get("payment_terms"),
"estimated_delivery": result.get("estimated_delivery")
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@mcp.tool()
async def check_inventory(
product_ids: List[str],
warehouse: Optional[str] = None
) -> dict:
"""Check product inventory/stock
Args:
product_ids: List of product IDs to check
warehouse: Specific warehouse to check (optional)
Returns:
Inventory status for each product
"""
payload = {"product_ids": product_ids}
if warehouse:
payload["warehouse"] = warehouse
try:
result = await hyperf.post("/products/inventory/check", json=payload)
return {
"success": True,
"inventory": result.get("inventory", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"inventory": []
}
@mcp.tool()
async def get_categories() -> dict:
"""Get product category tree
Returns:
Hierarchical category structure
"""
try:
result = await hyperf.get("/products/categories")
return {
"success": True,
"categories": result.get("categories", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"categories": []
}
@mcp.tool()
async def search_products(
keyword: str,
page_size: int = 60,
page: int = 1,
user_token: str = None,
user_id: str = None,
account_id: str = None
) -> dict:
"""Search products from Mall API
从 Mall API 搜索商品 SPU根据关键词
Args:
keyword: 搜索关键词(商品名称、编号等)
page_size: 每页数量 (default: 60, max 100)
page: 页码 (default: 1)
user_token: 用户 JWT token必需用于 Mall API 认证)
user_id: 用户 ID自动注入
account_id: 账户 ID自动注入
Returns:
商品列表,包含 SPU 信息、商品图片、价格等
Product list including SPU ID, name, image, price, etc.
"""
if not user_token:
return {
"success": False,
"error": "用户未登录,请先登录账户以搜索商品",
"products": [],
"total": 0,
"require_login": True
}
try:
from shared.mall_client import MallClient
# 使用用户 token 创建 Mall 客户端
mall = MallClient(
api_url=settings.mall_api_url,
api_token=user_token,
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
result = await mall.search_spu_products(
keyword=keyword,
page_size=page_size,
page=page
)
# 解析返回结果
products = result.get("list", [])
total = result.get("total", 0)
# 格式化商品数据
formatted_products = []
for product in products:
formatted_products.append({
"spu_id": product.get("spuId"),
"spu_sn": product.get("spuSn"),
"product_name": product.get("productName"),
"product_image": product.get("productImage"),
"price": product.get("price"),
"special_price": product.get("specialPrice"),
"stock": product.get("stock"),
"sales_count": product.get("salesCount", 0)
})
return {
"success": True,
"products": formatted_products,
"total": total,
"keyword": keyword
}
except Exception as e:
return {
"success": False,
"error": str(e),
"products": [],
"total": 0
}
finally:
# 关闭客户端
if 'client' in dir() and 'mall' in dir():
await mall.close()
# Health check endpoint
@mcp.tool()
async def health_check() -> dict:
"""Check server health status"""
return {
"status": "healthy",
"service": "product_mcp",
"version": "1.0.0"
}
if __name__ == "__main__":
import uvicorn
# Create FastAPI app from MCP
app = mcp.http_app()
# Add health endpoint
from starlette.responses import JSONResponse
async def health_check(request):
return JSONResponse({"status": "healthy"})
# Add the route to the app
from starlette.routing import Route
app.router.routes.append(Route('/health', health_check, methods=['GET']))
uvicorn.run(app, host="0.0.0.0", port=8004)