Files
assistant/agent/agents/product.py
2026-01-28 19:00:13 +08:00

570 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Product Agent - Handles product search, recommendations, and quotes
"""
import json
from typing import Any
from core.state import AgentState, ConversationState, add_tool_call, set_response, update_context
from core.llm import get_llm_client, Message
from utils.logger import get_logger
logger = get_logger(__name__)
PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。
你的职责是帮助用户找到合适的商品,包括:
- 商品搜索
- 智能推荐
- B2B 询价
- 库存查询
- 商品详情
## 可用工具
1. **search_products** - 搜索商品
- keyword: 搜索关键词(商品名称、编号等)
- page_size: 每页数量(默认 5最大 100
- page: 页码(默认 1
- 说明:此工具使用 Mall API 搜索商品 SPU支持用户 token 认证,返回卡片格式展示
2. **get_product_detail** - 获取商品详情
- product_id: 商品ID
3. **recommend_products** - 智能推荐(心动清单/猜你喜欢)
- page_size: 推荐数量(默认 6最大 100
- page: 页码(默认 1
- warehouse_id: 仓库ID默认 2
- 说明:此工具使用 Mall API /mall/api/loveList 接口,需要用户 token 认证,系统会自动注入用户 token
4. **get_quote** - B2B 询价
- product_id: 商品ID
- quantity: 采购数量
- delivery_address: 收货地址(可选,用于计算运费)
5. **check_inventory** - 库存查询
- product_ids: 商品ID列表
- warehouse: 仓库(可选)
## 工具调用格式
当需要使用工具时,请返回 JSON 格式:
```json
{
"action": "call_tool",
"tool_name": "工具名称",
"arguments": {
"参数名": "参数值"
}
}
```
**示例**
用户说:"搜索 ring"
返回:
```json
{
"action": "call_tool",
"tool_name": "search_products",
"arguments": {
"keyword": "ring"
}
}
```
用户说:"查找手机"
返回:
```json
{
"action": "call_tool",
"tool_name": "search_products",
"arguments": {
"keyword": "手机"
}
}
```
用户说:"推荐一些商品"
返回:
```json
{
"action": "call_tool",
"tool_name": "recommend_products",
"arguments": {
"page_size": 6
}
}
```
当需要向用户询问更多信息时:
```json
{
"action": "ask_info",
"question": "需要询问的问题"
}
```
当可以直接回答时:
```json
{
"action": "respond",
"response": "回复内容"
}
```
## B2B 询价特点
- 大批量采购通常有阶梯价格
- 可能需要考虑运费
- 企业客户可能有专属折扣
- 报价通常有有效期
## 商品推荐策略
**重要规则:推荐 vs 搜索**
- **泛泛推荐**"推荐一些商品""推荐一下""有什么好推荐的" → 使用 recommend_products
- **具体商品推荐**"推荐ring相关的商品""推荐手机""推荐一些珠宝" → 使用 search_products (提取关键词ring、手机、珠宝)
- **商品搜索**"搜索ring""找ring商品" → 使用 search_products
**说明**
- 如果用户推荐请求中包含具体的商品关键词(如 ring、手机、珠宝等使用 search_products 进行精准搜索
- 只有在泛泛请求推荐时才使用 recommend_products基于用户行为的个性化推荐
**其他推荐依据**
- 根据用户采购历史推荐
- 根据当前查询语义推荐
- 根据企业行业特点推荐
- 根据季节性和热门商品推荐
## 注意事项
- 帮助用户准确描述需求
- 如果搜索结果太多,建议用户缩小范围
- 询价时确认数量,因为会影响价格
- 库存紧张时及时告知用户
"""
async def product_agent(state: AgentState) -> AgentState:
"""Product agent node
Handles product search, recommendations, quotes and inventory queries.
Args:
state: Current agent state
Returns:
Updated state with tool calls or response
"""
logger.info(
"Product agent processing",
conversation_id=state["conversation_id"],
sub_intent=state.get("sub_intent")
)
state["current_agent"] = "product"
state["agent_history"].append("product")
state["state"] = ConversationState.PROCESSING.value
# ========== FAST PATH: Image Search ==========
# Check if this is an image search request
image_search_url = state.get("context", {}).get("image_search_url")
if image_search_url:
logger.info(
"Image search detected, calling search_products_by_image",
conversation_id=state["conversation_id"],
image_url=image_search_url[:100] + "..." if len(image_search_url) > 100 else image_search_url
)
# 直接调用图片搜索工具
state = add_tool_call(
state,
tool_name="search_products_by_image",
arguments={
"image_url": image_search_url,
"page_size": 6,
"page": 1
},
server="product"
)
# 清除 image_search_url 防止无限循环
state["context"]["image_search_url"] = None
state["state"] = ConversationState.TOOL_CALLING.value
return state
# ==============================================
# Check if we have tool results to process
if state["tool_results"]:
return await _generate_product_response(state)
# Build messages for LLM
messages = [
Message(role="system", content=PRODUCT_AGENT_PROMPT),
]
# Add conversation history
# 只保留最近 2 条历史消息以减少 token 数量和响应时间
for msg in state["messages"][-2:]:
messages.append(Message(role=msg["role"], content=msg["content"]))
# Build context info
context_info = f"用户ID: {state['user_id']}\n账户ID: {state['account_id']}\n"
if state["entities"]:
context_info += f"已提取的信息: {json.dumps(state['entities'], ensure_ascii=False)}\n"
if state["context"].get("product_id"):
context_info += f"当前讨论的商品ID: {state['context']['product_id']}\n"
if state["context"].get("recent_searches"):
context_info += f"最近搜索: {state['context']['recent_searches']}\n"
user_content = f"{context_info}\n用户消息: {state['current_message']}"
messages.append(Message(role="user", content=user_content))
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
# Parse response
content = response.content.strip()
# Log raw LLM response for debugging
logger.info(
"Product agent LLM response",
response_length=len(content),
response_preview=content[:200],
conversation_id=state["conversation_id"]
)
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
# Remove leading/trailing whitespace after removing code block markers
content = content.strip()
# Handle non-JSON format: "tool_name\n{args}"
if '\n' in content and not content.startswith('{'):
lines = content.split('\n', 1)
tool_name = lines[0].strip()
args_json = lines[1].strip() if len(lines) > 1 else '{}'
try:
arguments = json.loads(args_json) if args_json else {}
result = {
"action": "call_tool",
"tool_name": tool_name,
"arguments": arguments
}
except json.JSONDecodeError:
# If args parsing fails, use empty dict
result = {
"action": "call_tool",
"tool_name": tool_name,
"arguments": {}
}
else:
# Standard JSON format
result = json.loads(content)
action = result.get("action")
if action == "call_tool":
arguments = result.get("arguments", {})
tool_name = result.get("tool_name", "")
logger.info(
"Product agent calling tool",
tool_name=tool_name,
arguments=arguments,
conversation_id=state["conversation_id"]
)
# Inject context for product search (Mall API)
if tool_name == "search_products":
arguments["user_token"] = state.get("user_token")
arguments["user_id"] = state["user_id"]
arguments["account_id"] = state["account_id"]
# Set default page_size if not provided
if "page_size" not in arguments:
arguments["page_size"] = 6
# Set default page if not provided
if "page" not in arguments:
arguments["page"] = 1
# Map "query" parameter to "keyword" for compatibility
if "query" in arguments and "keyword" not in arguments:
arguments["keyword"] = arguments.pop("query")
logger.info(
"Parameter mapped: query -> keyword",
conversation_id=state["conversation_id"]
)
# Inject context for recommendation
if tool_name == "recommend_products":
arguments["user_token"] = state.get("user_token")
# 如果没有提供 page_size使用默认值 6
if "page_size" not in arguments:
arguments["page_size"] = 6
# 如果没有提供 warehouse_id使用默认值 2
if "warehouse_id" not in arguments:
arguments["warehouse_id"] = 2
logger.info(
"Product agent recommend_products after injection",
user_token_present="user_token" in arguments,
user_token_preview=arguments.get("user_token", "")[:20] + "..." if arguments.get("user_token") else None,
arguments=arguments,
conversation_id=state["conversation_id"]
)
# Inject context for quote
if tool_name == "get_quote":
arguments["account_id"] = state["account_id"]
# Use entity if available
if "product_id" not in arguments and state["entities"].get("product_id"):
arguments["product_id"] = state["entities"]["product_id"]
if "quantity" not in arguments and state["entities"].get("quantity"):
arguments["quantity"] = state["entities"]["quantity"]
state = add_tool_call(
state,
tool_name=tool_name,
arguments=arguments,
server="product"
)
state["state"] = ConversationState.TOOL_CALLING.value
elif action == "ask_info":
state = set_response(state, result["question"])
state["state"] = ConversationState.AWAITING_INFO.value
elif action == "respond":
state = set_response(state, result["response"])
state["state"] = ConversationState.GENERATING.value
return state
except json.JSONDecodeError as e:
logger.error(
"Failed to parse product agent LLM response as JSON",
error=str(e),
conversation_id=state.get("conversation_id"),
raw_content=response.content[:500] if response.content else "EMPTY"
)
# Don't use raw content as response - use fallback instead
state = set_response(state, "抱歉,我无法理解您的请求。请尝试重新表述或联系人工客服。")
return state
except Exception as e:
logger.error("Product agent failed", error=str(e))
state["error"] = str(e)
return state
async def _generate_product_response(state: AgentState) -> AgentState:
"""Generate response based on product tool results"""
# 特殊处理:如果是 search_products、recommend_products 或 search_products_by_image 工具返回,直接发送商品卡片
has_product_result = False
products = []
result_source = None # "search", "recommend" 或 "image_search"
# 添加日志:查看所有工具结果
import json as json_module
logger.info(
"All tool results",
tool_results_count=len(state.get("tool_results", [])),
tool_results=json_module.dumps(state.get("tool_results", []), ensure_ascii=False, indent=2)[:2000]
)
for result in state["tool_results"]:
logger.info(
"Processing tool result",
tool_name=result["tool_name"],
success=result["success"],
data_keys=list(result.get("data", {}).keys()) if isinstance(result.get("data"), dict) else "not a dict",
data_preview=json_module.dumps(result.get("data"), ensure_ascii=False)[:500]
)
if result["success"] and result["tool_name"] in ["search_products", "recommend_products", "search_products_by_image"]:
data = result["data"]
if isinstance(data, dict) and data.get("success"):
# MCP 返回的数据结构: {"success": true, "result": {"success": true, "products": [...]}}
# 需要从 result.result 中提取实际数据
inner_data = data.get("result", data)
products = inner_data.get("products", [])
keyword = inner_data.get("keyword", "")
has_product_result = True
if result["tool_name"] == "recommend_products":
result_source = "recommend"
elif result["tool_name"] == "search_products_by_image":
result_source = "image_search"
else:
result_source = "search"
logger.info(
f"Product {result_source} results found",
products_count=len(products),
keyword=keyword,
products_preview=json_module.dumps(products[:2], ensure_ascii=False, indent=2) if products else "[]"
)
break
# 如果有商品结果直接发送商品卡片product_list 格式)
if has_product_result and products:
try:
from integrations.chatwoot import ChatwootClient
from core.language_detector import detect_language
# 检测语言
detected_language = state.get("detected_language", "en")
# 发送商品列表
chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1)))
conversation_id = state.get("conversation_id")
if conversation_id:
await chatwoot.send_product_cards(
conversation_id=int(conversation_id),
products=products,
language=detected_language
)
logger.info(
f"Product {result_source} cards sent successfully",
conversation_id=conversation_id,
products_count=len(products),
language=detected_language,
result_source=result_source
)
# 清空响应,避免重复发送
state = set_response(state, "")
state["state"] = ConversationState.GENERATING.value
return state
except Exception as e:
logger.error(
f"Failed to send product {result_source} cards, falling back to text response",
error=str(e),
products_count=len(products),
result_source=result_source
)
# 常规处理:生成文本响应
tool_context = []
for result in state["tool_results"]:
if result["success"]:
tool_name = result['tool_name']
data = result['data']
# Extract only essential information based on tool type
if tool_name == "search_products" or tool_name == "recommend_products":
products = data.get("products", []) if isinstance(data, dict) else []
if products:
# Keep top 5 products with more details
product_items = []
for p in products[:5]: # Increased from 3 to 5
name = p.get('product_name', 'N/A')
price = p.get('price', 'N/A')
special_price = p.get('special_price')
spu_id = p.get('spu_id', '')
# Show special price if available
if special_price and float(special_price) > 0:
price_str = f"原价: {price}, 特价: {special_price}"
else:
price_str = str(price)
# Format: [ID] Name - Price
if spu_id:
product_items.append(f"- [{spu_id}] {name} - {price_str}")
else:
product_items.append(f"- {name} - {price_str}")
summary = f"Found {len(products)} products:\n" + "\n".join(product_items)
# Add note if there are more products
if len(products) > 5:
summary += f"\n(and {len(products) - 5} more products, visit website for full selection)"
tool_context.append(summary)
else:
tool_context.append("No products found")
elif tool_name == "get_product_detail":
product = data.get("product", {}) if isinstance(data, dict) else {}
name = product.get("product_name", product.get("name", "N/A"))
price = product.get("price", "N/A")
stock = product.get("stock", product.get("stock_status", "N/A"))
summary = f"Product: {name} | Price: {price} | Stock: {stock}"
tool_context.append(summary)
elif tool_name == "check_inventory":
inventory = data.get("inventory", []) if isinstance(data, dict) else []
inv_summaries = [f"{inv.get('product_id', 'N/A')}: {inv.get('quantity', 'N/A')} available" for inv in inventory[:3]]
summary = "Inventory status:\n" + "\n".join(inv_summaries)
tool_context.append(summary)
elif tool_name == "get_pricing":
product_id = data.get("product_id", "N/A")
unit_price = data.get("unit_price", "N/A")
total_price = data.get("total_price", "N/A")
summary = f"Quote for {product_id}: Unit: {unit_price} | Total: {total_price}"
tool_context.append(summary)
else:
# For other tools, include concise summary (limit to 200 chars)
data_str = json.dumps(data, ensure_ascii=False)[:200]
tool_context.append(f"工具 {tool_name} 返回: {data_str}...")
# Extract product context
if isinstance(data, dict):
if data.get("product_id"):
state = update_context(state, {"product_id": data["product_id"]})
if data.get("products"):
# Store recent search results
product_ids = [p.get("product_id") for p in data["products"][:5]]
state = update_context(state, {"recent_product_ids": product_ids})
else:
tool_context.append(f"工具 {result['tool_name']} 执行失败: {result['error']}")
prompt = f"""基于以下商品系统返回的信息,生成对用户的回复。
用户问题: {state["current_message"]}
系统返回信息:
{chr(10).join(tool_context)}
请生成一个清晰、有帮助的回复:
- 如果是搜索结果,展示商品名称、价格、规格等关键信息
- 如果是询价结果,清晰说明单价、总价、折扣、有效期等
- 如果是推荐商品,简要说明推荐理由
- 如果是库存查询,告知可用数量和发货时间
- 结果较多时可以总结关键信息
只返回回复内容,不要返回 JSON。"""
messages = [
Message(role="system", content="你是一个专业的商品顾问,请根据系统返回的信息回答用户的商品问题。"),
Message(role="user", content=prompt)
]
try:
llm = get_llm_client()
# Lower temperature for faster response
response = await llm.chat(messages, temperature=0.3)
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Product response generation failed", error=str(e))
state = set_response(state, "抱歉,处理商品信息时遇到问题。请稍后重试或联系人工客服。")
return state