Files
wangliang 965b11316e feat: 添加图片搜索功能和 Qwen 模型支持
图片搜索功能(以图搜图):
- Chatwoot webhook 检测图片搜索消息 (content_type="search_image")
- 从 content_attributes.url 提取图片 URL
- 调用 Mall API 图片搜索接口 (/mall/api/spu?searchImageUrl=...)
- 支持嵌套和顶层 URL 位置提取
- Product Agent 添加 fast path 直接调用图片搜索工具
- 防止无限循环(使用后清除 context.image_search_url)

Qwen 模型支持:
- 添加 LLM provider 选择(zhipu/qwen)
- 实现 QwenLLMClient 类(基于 DashScope SDK)
- 添加 dashscope>=1.14.0 依赖
- 修复 API key 设置(直接设置 dashscope.api_key)
- 更新 .env.example 和 docker-compose.yml 配置

其他优化:
- 重构 Chatwoot 集成代码(删除冗余)
- 优化 Product Agent prompt
- 增强 Customer Service Agent 多语言支持

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-27 19:10:06 +08:00

649 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Product MCP Server - Product search, recommendations, and quotes
"""
import sys
import os
from typing import Optional, List, Dict, Any
# Add shared module to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class Settings(BaseSettings):
"""Server configuration"""
hyperf_api_url: str
hyperf_api_token: str
mall_api_url: str
mall_tenant_id: str = "2"
mall_currency_code: str = "EUR"
mall_language_id: str = "1"
mall_source: str = "us.qa1.gaia888.com"
log_level: str = "INFO"
model_config = ConfigDict(env_file=".env")
settings = Settings()
# Create MCP server
mcp = FastMCP(
"Product Service"
)
# Tool registry for HTTP access
_tools: Dict[str, Any] = {}
def register_tool(name: str):
"""Decorator to register tool in _tools dict"""
def decorator(func):
_tools[name] = func
return func
return decorator
# Hyperf client for this server
from shared.hyperf_client import HyperfClient
hyperf = HyperfClient(settings.hyperf_api_url, settings.hyperf_api_token)
@register_tool("get_product_detail")
@mcp.tool()
async def get_product_detail(
product_id: str
) -> dict:
"""Get product details
Args:
product_id: Product ID
Returns:
Detailed product information including specifications, pricing, and stock
"""
try:
result = await hyperf.get(f"/products/{product_id}")
return {
"success": True,
"product": result
}
except Exception as e:
return {
"success": False,
"error": str(e),
"product": None
}
@register_tool("recommend_products")
@mcp.tool()
async def recommend_products(
user_token: str,
page: int = 1,
page_size: int = 6,
warehouse_id: int = 2
) -> dict:
"""Get recommended products from Mall API Love List
从 Mall API 获取推荐商品列表(心动清单/猜你喜欢)
Args:
user_token: User JWT token for authentication
page: Page number (default: 1)
page_size: Number of products per page (default: 6, max 100)
warehouse_id: Warehouse ID (default: 2)
Returns:
List of recommended products with product details
"""
try:
from shared.mall_client import MallClient
import logging
logger = logging.getLogger(__name__)
logger.info(f"recommend_products called: page={page}, page_size={page_size}, warehouse_id={warehouse_id}")
# 创建 Mall 客户端(使用 user_token
mall = MallClient(
api_url=settings.mall_api_url,
api_token=user_token, # 使用用户的 token
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
result = await mall.get_love_list(
page=page,
page_size=page_size,
warehouse_id=warehouse_id
)
logger.info(
f"Mall API love list returned: result_type={type(result).__name__}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, "
f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}"
)
# 解析返回结果
# Mall API 实际返回结构: {"total": X, "data": {"data": [...]}}
if isinstance(result, dict) and "data" in result:
data = result.get("data", {})
if isinstance(data, dict) and "data" in data:
products_list = data.get("data", [])
else:
products_list = []
total = result.get("total", 0)
elif isinstance(result, dict) and "list" in result:
# 兼容可能的 list 结构
products_list = result.get("list", [])
total = result.get("total", 0)
else:
products_list = []
total = 0
logger.info(f"Extracted {len(products_list)} products from love list, total={total}")
# 格式化商品数据(与 search_products 格式一致)
formatted_products = []
for product in products_list:
formatted_products.append({
"spu_id": product.get("spuId"),
"spu_sn": product.get("spuSn"),
"product_name": product.get("spuName"),
"product_image": product.get("masterImage"),
"price": product.get("price"),
"special_price": product.get("specialPrice"),
"stock": product.get("stockDescribe"),
"sales_count": product.get("salesCount", 0),
# 额外有用字段
"href": product.get("href"),
"spu_type": product.get("spuType"),
"spu_type_name": product.get("spuTypeName"),
"min_price": product.get("minPrice"),
"max_price": product.get("maxPrice"),
"price_with_currency": product.get("priceWithCurrency"),
"mark_price": product.get("markPrice"),
"skus_count": len(product.get("skus", []))
})
logger.info(f"Formatted {len(formatted_products)} products for recommendation")
return {
"success": True,
"products": formatted_products,
"total": total,
"keyword": "recommendation"
}
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"recommend_products failed: {str(e)}", exc_info=True)
return {
"success": False,
"error": str(e),
"products": [],
"total": 0
}
finally:
# 关闭客户端
if 'mall' in dir():
await mall.close()
@register_tool("get_quote")
@mcp.tool()
async def get_quote(
product_id: str,
quantity: int,
account_id: str,
delivery_province: Optional[str] = None,
delivery_city: Optional[str] = None
) -> dict:
"""Get B2B price quote
Args:
product_id: Product ID
quantity: Desired quantity
account_id: B2B account ID (for customer-specific pricing)
delivery_province: Delivery province (for shipping calculation)
delivery_city: Delivery city (for shipping calculation)
Returns:
Detailed quote with unit price, discounts, tax, and shipping
"""
payload = {
"product_id": product_id,
"quantity": quantity,
"account_id": account_id
}
if delivery_province or delivery_city:
payload["delivery_address"] = {}
if delivery_province:
payload["delivery_address"]["province"] = delivery_province
if delivery_city:
payload["delivery_address"]["city"] = delivery_city
try:
result = await hyperf.post("/products/quote", json=payload)
return {
"success": True,
"quote_id": result.get("quote_id"),
"product_id": product_id,
"quantity": quantity,
"unit_price": result.get("unit_price"),
"subtotal": result.get("subtotal"),
"discount": result.get("discount", 0),
"discount_reason": result.get("discount_reason"),
"tax": result.get("tax"),
"shipping_fee": result.get("shipping_fee"),
"total_price": result.get("total_price"),
"validity": result.get("validity"),
"payment_terms": result.get("payment_terms"),
"estimated_delivery": result.get("estimated_delivery")
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@register_tool("check_inventory")
@mcp.tool()
async def check_inventory(
product_ids: List[str],
warehouse: Optional[str] = None
) -> dict:
"""Check product inventory/stock
Args:
product_ids: List of product IDs to check
warehouse: Specific warehouse to check (optional)
Returns:
Inventory status for each product
"""
payload = {"product_ids": product_ids}
if warehouse:
payload["warehouse"] = warehouse
try:
result = await hyperf.post("/products/inventory/check", json=payload)
return {
"success": True,
"inventory": result.get("inventory", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"inventory": []
}
@register_tool("get_categories")
@mcp.tool()
async def get_categories() -> dict:
"""Get product category tree
Returns:
Hierarchical category structure
"""
try:
result = await hyperf.get("/products/categories")
return {
"success": True,
"categories": result.get("categories", [])
}
except Exception as e:
return {
"success": False,
"error": str(e),
"categories": []
}
@register_tool("search_products")
@mcp.tool()
async def search_products(
keyword: str,
page_size: int = 6,
page: int = 1
) -> dict:
"""Search products from Mall API
从 Mall API 搜索商品 SPU根据关键词
Args:
keyword: 搜索关键词(商品名称、编号等)
page_size: 每页数量 (default: 6, max 100)
page: 页码 (default: 1)
Returns:
商品列表,包含 SPU 信息、商品图片、价格等
Product list including SPU ID, name, image, price, etc.
"""
try:
from shared.mall_client import MallClient
import logging
logger = logging.getLogger(__name__)
logger.info(f"search_products called with keyword={keyword}")
print(f"[DEBUG] search_products called: keyword={keyword}")
# 创建 Mall 客户端(无需 token
mall = MallClient(
api_url=settings.mall_api_url,
api_token=None, # 不需要 token
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
print(f"[DEBUG] Calling Mall API: keyword={keyword}, page_size={page_size}, page={page}")
result = await mall.search_spu_products(
keyword=keyword,
page_size=page_size,
page=page
)
logger.info(
f"Mall API returned: result_type={type(result).__name__}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, "
f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}, "
f"data_length={len(result.get('data', {}).get('data', [])) if isinstance(result, dict) and isinstance(result.get('data'), dict) else 'N/A'}"
)
print(f"[DEBUG] Mall API returned: total={result.get('total', 'N/A')}, data_keys={list(result.get('data', {}).keys()) if isinstance(result.get('data'), dict) else 'N/A'}")
# 详细输出商品数据
if "data" in result and isinstance(result["data"], dict):
products = result["data"].get("data", [])
print(f"[DEBUG] Found {len(products)} products in response")
for i, p in enumerate(products[:3]): # 只打印前3个
print(f"[DEBUG] Product {i+1}: spuId={p.get('spuId')}, spuName={p.get('spuName')}, price={p.get('price')}")
else:
products = result.get("list", [])
print(f"[DEBUG] Found {len(products)} products in list")
total = result.get("total", 0)
print(f"[DEBUG] Total products: {total}")
# 解析返回结果
# Mall API 返回结构: {"total": X, "data": {"data": [...], ...}}
if "data" in result and isinstance(result["data"], dict):
products = result["data"].get("data", [])
else:
products = result.get("list", [])
total = result.get("total", 0)
# 格式化商品数据
formatted_products = []
for product in products:
formatted_products.append({
"spu_id": product.get("spuId"),
"spu_sn": product.get("spuSn"),
"product_name": product.get("spuName"), # 修正字段名
"product_image": product.get("masterImage"), # 修正字段名
"price": product.get("price"),
"special_price": product.get("specialPrice"),
"stock": product.get("stockDescribe"), # 修正字段名
"sales_count": product.get("salesCount", 0),
# 额外有用字段
"href": product.get("href"),
"spu_type": product.get("spuType"),
"spu_type_name": product.get("spuTypeName"),
"min_price": product.get("minPrice"),
"max_price": product.get("maxPrice"),
"price_with_currency": product.get("priceWithCurrency"),
"mark_price": product.get("markPrice"),
"skus_count": len(product.get("skus", []))
})
return {
"success": True,
"products": formatted_products,
"total": total,
"keyword": keyword
}
except Exception as e:
return {
"success": False,
"error": str(e),
"products": [],
"total": 0
}
finally:
# 关闭客户端
if 'client' in dir() and 'mall' in dir():
await mall.close()
@register_tool("search_products_by_image")
@mcp.tool()
async def search_products_by_image(
image_url: str,
page_size: int = 6,
page: int = 1
) -> dict:
"""Search products by image from Mall API
从 Mall API 根据图片搜索商品 SPU以图搜图
Args:
image_url: 图片 URL需要 URL 编码)
page_size: 每页数量 (default: 6, max 100)
page: 页码 (default: 1)
Returns:
商品列表,包含 SPU 信息、商品图片、价格等
Product list including SPU ID, name, image, price, etc.
"""
try:
from shared.mall_client import MallClient
import logging
logger = logging.getLogger(__name__)
logger.info(f"search_products_by_image called: image_url={image_url}")
# 创建 Mall 客户端(不需要 token
mall = MallClient(
api_url=settings.mall_api_url,
api_token=None, # 图片搜索不需要 token
tenant_id=settings.mall_tenant_id,
currency_code=settings.mall_currency_code,
language_id=settings.mall_language_id,
source=settings.mall_source
)
# 调用 Mall API 图片搜索接口
# 注意httpx 会自动对 params 进行 URL 编码,不需要手动编码
result = await mall.get(
"/mall/api/spu",
params={
"pageSize": min(page_size, 100),
"page": page,
"searchImageUrl": image_url
}
)
logger.info(
f"Mall API image search returned: result_type={type(result).__name__}, "
f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, "
f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}"
)
# 解析返回结果
# Mall API 返回结构: {"total": X, "data": {"data": [...]}}
if "data" in result and isinstance(result["data"], dict):
products = result["data"].get("data", [])
else:
products = result.get("list", [])
total = result.get("total", 0)
# 格式化商品数据(与 search_products 格式一致)
formatted_products = []
for product in products:
formatted_products.append({
"spu_id": product.get("spuId"),
"spu_sn": product.get("spuSn"),
"product_name": product.get("spuName"),
"product_image": product.get("masterImage"),
"price": product.get("price"),
"special_price": product.get("specialPrice"),
"stock": product.get("stockDescribe"),
"sales_count": product.get("salesCount", 0),
# 额外有用字段
"href": product.get("href"),
"spu_type": product.get("spuType"),
"spu_type_name": product.get("spuTypeName"),
"min_price": product.get("minPrice"),
"max_price": product.get("maxPrice"),
"price_with_currency": product.get("priceWithCurrency"),
"mark_price": product.get("markPrice"),
"skus_count": len(product.get("skus", []))
})
logger.info(f"Formatted {len(formatted_products)} products from image search")
return {
"success": True,
"products": formatted_products,
"total": total,
"keyword": "image_search",
"image_url": image_url
}
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"search_products_by_image failed: {str(e)}", exc_info=True)
return {
"success": False,
"error": str(e),
"products": [],
"total": 0
}
finally:
# 关闭客户端
if 'mall' in dir():
await mall.close()
# Health check endpoint
@register_tool("health_check")
@mcp.tool()
async def health_check() -> dict:
"""Check server health status"""
return {
"status": "healthy",
"service": "product_mcp",
"version": "1.0.0"
}
if __name__ == "__main__":
import uvicorn
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette.requests import Request
# Custom tool execution endpoint
async def execute_tool(request: Request):
"""Execute an MCP tool via HTTP"""
tool_name = request.path_params["tool_name"]
try:
# Get arguments from request body
arguments = await request.json()
print(f"[DEBUG HTTP] Tool: {tool_name}, Args: {arguments}")
# Get tool function from registry
if tool_name not in _tools:
print(f"[ERROR] Tool '{tool_name}' not found in registry")
return JSONResponse({
"success": False,
"error": f"Tool '{tool_name}' not found"
}, status_code=404)
tool_obj = _tools[tool_name]
print(f"[DEBUG HTTP] Tool object: {tool_obj}, type: {type(tool_obj)}")
# Filter arguments to only include parameters expected by the tool
# Get parameter names from tool's parameters schema
tool_params = tool_obj.parameters.get('properties', {})
filtered_args = {k: v for k, v in arguments.items() if k in tool_params}
if len(filtered_args) < len(arguments):
print(f"[DEBUG HTTP] Filtered arguments: {arguments} -> {filtered_args}")
# Call the tool with filtered arguments
# FastMCP FunctionTool.run() takes a dict of arguments
print(f"[DEBUG HTTP] Calling tool.run()...")
tool_result = await tool_obj.run(filtered_args)
print(f"[DEBUG HTTP] Tool result: {tool_result}")
# Extract content from ToolResult
# ToolResult.content is a list of TextContent objects with a 'text' attribute
if tool_result.content and len(tool_result.content) > 0:
content = tool_result.content[0].text
# Try to parse as JSON if possible
try:
import json
result = json.loads(content)
except:
result = content
else:
result = None
return JSONResponse({
"success": True,
"result": result
})
except TypeError as e:
print(f"[ERROR] TypeError: {e}")
import traceback
traceback.print_exc()
return JSONResponse({
"success": False,
"error": f"Invalid arguments: {str(e)}"
}, status_code=400)
except Exception as e:
print(f"[ERROR] Exception: {e}")
import traceback
traceback.print_exc()
return JSONResponse({
"success": False,
"error": str(e)
}, status_code=500)
# Health check endpoint
async def health_check(request):
return JSONResponse({"status": "healthy"})
# Create routes list
routes = [
Route('/health', health_check, methods=['GET']),
Route('/tools/{tool_name}', execute_tool, methods=['POST'])
]
# Create app from MCP with custom routes
app = mcp.http_app()
# Add our custom routes to the existing app
for route in routes:
app.router.routes.append(route)
uvicorn.run(app, host="0.0.0.0", port=8004)