图片搜索功能(以图搜图): - Chatwoot webhook 检测图片搜索消息 (content_type="search_image") - 从 content_attributes.url 提取图片 URL - 调用 Mall API 图片搜索接口 (/mall/api/spu?searchImageUrl=...) - 支持嵌套和顶层 URL 位置提取 - Product Agent 添加 fast path 直接调用图片搜索工具 - 防止无限循环(使用后清除 context.image_search_url) Qwen 模型支持: - 添加 LLM provider 选择(zhipu/qwen) - 实现 QwenLLMClient 类(基于 DashScope SDK) - 添加 dashscope>=1.14.0 依赖 - 修复 API key 设置(直接设置 dashscope.api_key) - 更新 .env.example 和 docker-compose.yml 配置 其他优化: - 重构 Chatwoot 集成代码(删除冗余) - 优化 Product Agent prompt - 增强 Customer Service Agent 多语言支持 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
649 lines
21 KiB
Python
649 lines
21 KiB
Python
"""
|
||
Product MCP Server - Product search, recommendations, and quotes
|
||
"""
|
||
import sys
|
||
import os
|
||
from typing import Optional, List, Dict, Any
|
||
|
||
# Add shared module to path
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from fastmcp import FastMCP
|
||
from pydantic_settings import BaseSettings
|
||
from pydantic import ConfigDict
|
||
|
||
|
||
class Settings(BaseSettings):
|
||
"""Server configuration"""
|
||
hyperf_api_url: str
|
||
hyperf_api_token: str
|
||
mall_api_url: str
|
||
mall_tenant_id: str = "2"
|
||
mall_currency_code: str = "EUR"
|
||
mall_language_id: str = "1"
|
||
mall_source: str = "us.qa1.gaia888.com"
|
||
log_level: str = "INFO"
|
||
|
||
model_config = ConfigDict(env_file=".env")
|
||
|
||
|
||
settings = Settings()
|
||
|
||
# Create MCP server
|
||
mcp = FastMCP(
|
||
"Product Service"
|
||
)
|
||
|
||
# Tool registry for HTTP access
|
||
_tools: Dict[str, Any] = {}
|
||
|
||
|
||
def register_tool(name: str):
|
||
"""Decorator to register tool in _tools dict"""
|
||
def decorator(func):
|
||
_tools[name] = func
|
||
return func
|
||
return decorator
|
||
|
||
|
||
# Hyperf client for this server
|
||
from shared.hyperf_client import HyperfClient
|
||
hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token)
|
||
|
||
|
||
@register_tool("get_product_detail")
|
||
@mcp.tool()
|
||
async def get_product_detail(
|
||
product_id: str
|
||
) -> dict:
|
||
"""Get product details
|
||
|
||
Args:
|
||
product_id: Product ID
|
||
|
||
Returns:
|
||
Detailed product information including specifications, pricing, and stock
|
||
"""
|
||
try:
|
||
result = await hyperf.get(f"/products/{product_id}")
|
||
|
||
return {
|
||
"success": True,
|
||
"product": result
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"product": None
|
||
}
|
||
|
||
|
||
@register_tool("recommend_products")
|
||
@mcp.tool()
|
||
async def recommend_products(
|
||
user_token: str,
|
||
page: int = 1,
|
||
page_size: int = 6,
|
||
warehouse_id: int = 2
|
||
) -> dict:
|
||
"""Get recommended products from Mall API Love List
|
||
|
||
从 Mall API 获取推荐商品列表(心动清单/猜你喜欢)
|
||
|
||
Args:
|
||
user_token: User JWT token for authentication
|
||
page: Page number (default: 1)
|
||
page_size: Number of products per page (default: 6, max 100)
|
||
warehouse_id: Warehouse ID (default: 2)
|
||
|
||
Returns:
|
||
List of recommended products with product details
|
||
"""
|
||
try:
|
||
from shared.mall_client import MallClient
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
logger.info(f"recommend_products called: page={page}, page_size={page_size}, warehouse_id={warehouse_id}")
|
||
|
||
# 创建 Mall 客户端(使用 user_token)
|
||
mall = MallClient(
|
||
api_url=settings.mall_api_url,
|
||
api_token=user_token, # 使用用户的 token
|
||
tenant_id=settings.mall_tenant_id,
|
||
currency_code=settings.mall_currency_code,
|
||
language_id=settings.mall_language_id,
|
||
source=settings.mall_source
|
||
)
|
||
|
||
result = await mall.get_love_list(
|
||
page=page,
|
||
page_size=page_size,
|
||
warehouse_id=warehouse_id
|
||
)
|
||
|
||
logger.info(
|
||
f"Mall API love list returned: result_type={type(result).__name__}, "
|
||
f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, "
|
||
f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}"
|
||
)
|
||
|
||
# 解析返回结果
|
||
# Mall API 实际返回结构: {"total": X, "data": {"data": [...]}}
|
||
if isinstance(result, dict) and "data" in result:
|
||
data = result.get("data", {})
|
||
if isinstance(data, dict) and "data" in data:
|
||
products_list = data.get("data", [])
|
||
else:
|
||
products_list = []
|
||
total = result.get("total", 0)
|
||
elif isinstance(result, dict) and "list" in result:
|
||
# 兼容可能的 list 结构
|
||
products_list = result.get("list", [])
|
||
total = result.get("total", 0)
|
||
else:
|
||
products_list = []
|
||
total = 0
|
||
|
||
logger.info(f"Extracted {len(products_list)} products from love list, total={total}")
|
||
|
||
# 格式化商品数据(与 search_products 格式一致)
|
||
formatted_products = []
|
||
for product in products_list:
|
||
formatted_products.append({
|
||
"spu_id": product.get("spuId"),
|
||
"spu_sn": product.get("spuSn"),
|
||
"product_name": product.get("spuName"),
|
||
"product_image": product.get("masterImage"),
|
||
"price": product.get("price"),
|
||
"special_price": product.get("specialPrice"),
|
||
"stock": product.get("stockDescribe"),
|
||
"sales_count": product.get("salesCount", 0),
|
||
# 额外有用字段
|
||
"href": product.get("href"),
|
||
"spu_type": product.get("spuType"),
|
||
"spu_type_name": product.get("spuTypeName"),
|
||
"min_price": product.get("minPrice"),
|
||
"max_price": product.get("maxPrice"),
|
||
"price_with_currency": product.get("priceWithCurrency"),
|
||
"mark_price": product.get("markPrice"),
|
||
"skus_count": len(product.get("skus", []))
|
||
})
|
||
|
||
logger.info(f"Formatted {len(formatted_products)} products for recommendation")
|
||
|
||
return {
|
||
"success": True,
|
||
"products": formatted_products,
|
||
"total": total,
|
||
"keyword": "recommendation"
|
||
}
|
||
|
||
except Exception as e:
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
logger.error(f"recommend_products failed: {str(e)}", exc_info=True)
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"products": [],
|
||
"total": 0
|
||
}
|
||
finally:
|
||
# 关闭客户端
|
||
if 'mall' in dir():
|
||
await mall.close()
|
||
|
||
|
||
@register_tool("get_quote")
|
||
@mcp.tool()
|
||
async def get_quote(
|
||
product_id: str,
|
||
quantity: int,
|
||
account_id: str,
|
||
delivery_province: Optional[str] = None,
|
||
delivery_city: Optional[str] = None
|
||
) -> dict:
|
||
"""Get B2B price quote
|
||
|
||
Args:
|
||
product_id: Product ID
|
||
quantity: Desired quantity
|
||
account_id: B2B account ID (for customer-specific pricing)
|
||
delivery_province: Delivery province (for shipping calculation)
|
||
delivery_city: Delivery city (for shipping calculation)
|
||
|
||
Returns:
|
||
Detailed quote with unit price, discounts, tax, and shipping
|
||
"""
|
||
payload = {
|
||
"product_id": product_id,
|
||
"quantity": quantity,
|
||
"account_id": account_id
|
||
}
|
||
|
||
if delivery_province or delivery_city:
|
||
payload["delivery_address"] = {}
|
||
if delivery_province:
|
||
payload["delivery_address"]["province"] = delivery_province
|
||
if delivery_city:
|
||
payload["delivery_address"]["city"] = delivery_city
|
||
|
||
try:
|
||
result = await hyperf.post("/products/quote", json=payload)
|
||
|
||
return {
|
||
"success": True,
|
||
"quote_id": result.get("quote_id"),
|
||
"product_id": product_id,
|
||
"quantity": quantity,
|
||
"unit_price": result.get("unit_price"),
|
||
"subtotal": result.get("subtotal"),
|
||
"discount": result.get("discount", 0),
|
||
"discount_reason": result.get("discount_reason"),
|
||
"tax": result.get("tax"),
|
||
"shipping_fee": result.get("shipping_fee"),
|
||
"total_price": result.get("total_price"),
|
||
"validity": result.get("validity"),
|
||
"payment_terms": result.get("payment_terms"),
|
||
"estimated_delivery": result.get("estimated_delivery")
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"error": str(e)
|
||
}
|
||
|
||
|
||
@register_tool("check_inventory")
|
||
@mcp.tool()
|
||
async def check_inventory(
|
||
product_ids: List[str],
|
||
warehouse: Optional[str] = None
|
||
) -> dict:
|
||
"""Check product inventory/stock
|
||
|
||
Args:
|
||
product_ids: List of product IDs to check
|
||
warehouse: Specific warehouse to check (optional)
|
||
|
||
Returns:
|
||
Inventory status for each product
|
||
"""
|
||
payload = {"product_ids": product_ids}
|
||
if warehouse:
|
||
payload["warehouse"] = warehouse
|
||
|
||
try:
|
||
result = await hyperf.post("/products/inventory/check", json=payload)
|
||
|
||
return {
|
||
"success": True,
|
||
"inventory": result.get("inventory", [])
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"inventory": []
|
||
}
|
||
|
||
|
||
@register_tool("get_categories")
|
||
@mcp.tool()
|
||
async def get_categories() -> dict:
|
||
"""Get product category tree
|
||
|
||
Returns:
|
||
Hierarchical category structure
|
||
"""
|
||
try:
|
||
result = await hyperf.get("/products/categories")
|
||
|
||
return {
|
||
"success": True,
|
||
"categories": result.get("categories", [])
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"categories": []
|
||
}
|
||
|
||
|
||
@register_tool("search_products")
|
||
@mcp.tool()
|
||
async def search_products(
|
||
keyword: str,
|
||
page_size: int = 6,
|
||
page: int = 1
|
||
) -> dict:
|
||
"""Search products from Mall API
|
||
|
||
从 Mall API 搜索商品 SPU(根据关键词)
|
||
|
||
Args:
|
||
keyword: 搜索关键词(商品名称、编号等)
|
||
page_size: 每页数量 (default: 6, max 100)
|
||
page: 页码 (default: 1)
|
||
|
||
Returns:
|
||
商品列表,包含 SPU 信息、商品图片、价格等
|
||
Product list including SPU ID, name, image, price, etc.
|
||
"""
|
||
try:
|
||
from shared.mall_client import MallClient
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
logger.info(f"search_products called with keyword={keyword}")
|
||
print(f"[DEBUG] search_products called: keyword={keyword}")
|
||
|
||
# 创建 Mall 客户端(无需 token)
|
||
mall = MallClient(
|
||
api_url=settings.mall_api_url,
|
||
api_token=None, # 不需要 token
|
||
tenant_id=settings.mall_tenant_id,
|
||
currency_code=settings.mall_currency_code,
|
||
language_id=settings.mall_language_id,
|
||
source=settings.mall_source
|
||
)
|
||
|
||
print(f"[DEBUG] Calling Mall API: keyword={keyword}, page_size={page_size}, page={page}")
|
||
|
||
result = await mall.search_spu_products(
|
||
keyword=keyword,
|
||
page_size=page_size,
|
||
page=page
|
||
)
|
||
|
||
logger.info(
|
||
f"Mall API returned: result_type={type(result).__name__}, "
|
||
f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, "
|
||
f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}, "
|
||
f"data_length={len(result.get('data', {}).get('data', [])) if isinstance(result, dict) and isinstance(result.get('data'), dict) else 'N/A'}"
|
||
)
|
||
print(f"[DEBUG] Mall API returned: total={result.get('total', 'N/A')}, data_keys={list(result.get('data', {}).keys()) if isinstance(result.get('data'), dict) else 'N/A'}")
|
||
|
||
# 详细输出商品数据
|
||
if "data" in result and isinstance(result["data"], dict):
|
||
products = result["data"].get("data", [])
|
||
print(f"[DEBUG] Found {len(products)} products in response")
|
||
for i, p in enumerate(products[:3]): # 只打印前3个
|
||
print(f"[DEBUG] Product {i+1}: spuId={p.get('spuId')}, spuName={p.get('spuName')}, price={p.get('price')}")
|
||
else:
|
||
products = result.get("list", [])
|
||
print(f"[DEBUG] Found {len(products)} products in list")
|
||
total = result.get("total", 0)
|
||
print(f"[DEBUG] Total products: {total}")
|
||
|
||
# 解析返回结果
|
||
# Mall API 返回结构: {"total": X, "data": {"data": [...], ...}}
|
||
if "data" in result and isinstance(result["data"], dict):
|
||
products = result["data"].get("data", [])
|
||
else:
|
||
products = result.get("list", [])
|
||
total = result.get("total", 0)
|
||
|
||
# 格式化商品数据
|
||
formatted_products = []
|
||
for product in products:
|
||
formatted_products.append({
|
||
"spu_id": product.get("spuId"),
|
||
"spu_sn": product.get("spuSn"),
|
||
"product_name": product.get("spuName"), # 修正字段名
|
||
"product_image": product.get("masterImage"), # 修正字段名
|
||
"price": product.get("price"),
|
||
"special_price": product.get("specialPrice"),
|
||
"stock": product.get("stockDescribe"), # 修正字段名
|
||
"sales_count": product.get("salesCount", 0),
|
||
# 额外有用字段
|
||
"href": product.get("href"),
|
||
"spu_type": product.get("spuType"),
|
||
"spu_type_name": product.get("spuTypeName"),
|
||
"min_price": product.get("minPrice"),
|
||
"max_price": product.get("maxPrice"),
|
||
"price_with_currency": product.get("priceWithCurrency"),
|
||
"mark_price": product.get("markPrice"),
|
||
"skus_count": len(product.get("skus", []))
|
||
})
|
||
|
||
return {
|
||
"success": True,
|
||
"products": formatted_products,
|
||
"total": total,
|
||
"keyword": keyword
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"products": [],
|
||
"total": 0
|
||
}
|
||
finally:
|
||
# 关闭客户端
|
||
if 'client' in dir() and 'mall' in dir():
|
||
await mall.close()
|
||
|
||
|
||
@register_tool("search_products_by_image")
|
||
@mcp.tool()
|
||
async def search_products_by_image(
|
||
image_url: str,
|
||
page_size: int = 6,
|
||
page: int = 1
|
||
) -> dict:
|
||
"""Search products by image from Mall API
|
||
|
||
从 Mall API 根据图片搜索商品 SPU(以图搜图)
|
||
|
||
Args:
|
||
image_url: 图片 URL(需要 URL 编码)
|
||
page_size: 每页数量 (default: 6, max 100)
|
||
page: 页码 (default: 1)
|
||
|
||
Returns:
|
||
商品列表,包含 SPU 信息、商品图片、价格等
|
||
Product list including SPU ID, name, image, price, etc.
|
||
"""
|
||
try:
|
||
from shared.mall_client import MallClient
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
logger.info(f"search_products_by_image called: image_url={image_url}")
|
||
|
||
# 创建 Mall 客户端(不需要 token)
|
||
mall = MallClient(
|
||
api_url=settings.mall_api_url,
|
||
api_token=None, # 图片搜索不需要 token
|
||
tenant_id=settings.mall_tenant_id,
|
||
currency_code=settings.mall_currency_code,
|
||
language_id=settings.mall_language_id,
|
||
source=settings.mall_source
|
||
)
|
||
|
||
# 调用 Mall API 图片搜索接口
|
||
# 注意:httpx 会自动对 params 进行 URL 编码,不需要手动编码
|
||
result = await mall.get(
|
||
"/mall/api/spu",
|
||
params={
|
||
"pageSize": min(page_size, 100),
|
||
"page": page,
|
||
"searchImageUrl": image_url
|
||
}
|
||
)
|
||
|
||
logger.info(
|
||
f"Mall API image search returned: result_type={type(result).__name__}, "
|
||
f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, "
|
||
f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}"
|
||
)
|
||
|
||
# 解析返回结果
|
||
# Mall API 返回结构: {"total": X, "data": {"data": [...]}}
|
||
if "data" in result and isinstance(result["data"], dict):
|
||
products = result["data"].get("data", [])
|
||
else:
|
||
products = result.get("list", [])
|
||
total = result.get("total", 0)
|
||
|
||
# 格式化商品数据(与 search_products 格式一致)
|
||
formatted_products = []
|
||
for product in products:
|
||
formatted_products.append({
|
||
"spu_id": product.get("spuId"),
|
||
"spu_sn": product.get("spuSn"),
|
||
"product_name": product.get("spuName"),
|
||
"product_image": product.get("masterImage"),
|
||
"price": product.get("price"),
|
||
"special_price": product.get("specialPrice"),
|
||
"stock": product.get("stockDescribe"),
|
||
"sales_count": product.get("salesCount", 0),
|
||
# 额外有用字段
|
||
"href": product.get("href"),
|
||
"spu_type": product.get("spuType"),
|
||
"spu_type_name": product.get("spuTypeName"),
|
||
"min_price": product.get("minPrice"),
|
||
"max_price": product.get("maxPrice"),
|
||
"price_with_currency": product.get("priceWithCurrency"),
|
||
"mark_price": product.get("markPrice"),
|
||
"skus_count": len(product.get("skus", []))
|
||
})
|
||
|
||
logger.info(f"Formatted {len(formatted_products)} products from image search")
|
||
|
||
return {
|
||
"success": True,
|
||
"products": formatted_products,
|
||
"total": total,
|
||
"keyword": "image_search",
|
||
"image_url": image_url
|
||
}
|
||
except Exception as e:
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
logger.error(f"search_products_by_image failed: {str(e)}", exc_info=True)
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"products": [],
|
||
"total": 0
|
||
}
|
||
finally:
|
||
# 关闭客户端
|
||
if 'mall' in dir():
|
||
await mall.close()
|
||
|
||
|
||
# Health check endpoint
|
||
@register_tool("health_check")
|
||
@mcp.tool()
|
||
async def health_check() -> dict:
|
||
"""Check server health status"""
|
||
return {
|
||
"status": "healthy",
|
||
"service": "product_mcp",
|
||
"version": "1.0.0"
|
||
}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
from starlette.responses import JSONResponse
|
||
from starlette.routing import Route
|
||
from starlette.requests import Request
|
||
|
||
# Custom tool execution endpoint
|
||
async def execute_tool(request: Request):
|
||
"""Execute an MCP tool via HTTP"""
|
||
tool_name = request.path_params["tool_name"]
|
||
|
||
try:
|
||
# Get arguments from request body
|
||
arguments = await request.json()
|
||
|
||
print(f"[DEBUG HTTP] Tool: {tool_name}, Args: {arguments}")
|
||
|
||
# Get tool function from registry
|
||
if tool_name not in _tools:
|
||
print(f"[ERROR] Tool '{tool_name}' not found in registry")
|
||
return JSONResponse({
|
||
"success": False,
|
||
"error": f"Tool '{tool_name}' not found"
|
||
}, status_code=404)
|
||
|
||
tool_obj = _tools[tool_name]
|
||
print(f"[DEBUG HTTP] Tool object: {tool_obj}, type: {type(tool_obj)}")
|
||
|
||
# Filter arguments to only include parameters expected by the tool
|
||
# Get parameter names from tool's parameters schema
|
||
tool_params = tool_obj.parameters.get('properties', {})
|
||
filtered_args = {k: v for k, v in arguments.items() if k in tool_params}
|
||
|
||
if len(filtered_args) < len(arguments):
|
||
print(f"[DEBUG HTTP] Filtered arguments: {arguments} -> {filtered_args}")
|
||
|
||
# Call the tool with filtered arguments
|
||
# FastMCP FunctionTool.run() takes a dict of arguments
|
||
print(f"[DEBUG HTTP] Calling tool.run()...")
|
||
tool_result = await tool_obj.run(filtered_args)
|
||
print(f"[DEBUG HTTP] Tool result: {tool_result}")
|
||
|
||
# Extract content from ToolResult
|
||
# ToolResult.content is a list of TextContent objects with a 'text' attribute
|
||
if tool_result.content and len(tool_result.content) > 0:
|
||
content = tool_result.content[0].text
|
||
# Try to parse as JSON if possible
|
||
try:
|
||
import json
|
||
result = json.loads(content)
|
||
except:
|
||
result = content
|
||
else:
|
||
result = None
|
||
|
||
return JSONResponse({
|
||
"success": True,
|
||
"result": result
|
||
})
|
||
|
||
except TypeError as e:
|
||
print(f"[ERROR] TypeError: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return JSONResponse({
|
||
"success": False,
|
||
"error": f"Invalid arguments: {str(e)}"
|
||
}, status_code=400)
|
||
except Exception as e:
|
||
print(f"[ERROR] Exception: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return JSONResponse({
|
||
"success": False,
|
||
"error": str(e)
|
||
}, status_code=500)
|
||
|
||
# Health check endpoint
|
||
async def health_check(request):
|
||
return JSONResponse({"status": "healthy"})
|
||
|
||
# Create routes list
|
||
routes = [
|
||
Route('/health', health_check, methods=['GET']),
|
||
Route('/tools/{tool_name}', execute_tool, methods=['POST'])
|
||
]
|
||
|
||
# Create app from MCP with custom routes
|
||
app = mcp.http_app()
|
||
|
||
# Add our custom routes to the existing app
|
||
for route in routes:
|
||
app.router.routes.append(route)
|
||
|
||
uvicorn.run(app, host="0.0.0.0", port=8004)
|