Files
assistant/agent/agents/product.py
wangliang fa2c8f8102 fix: 更新 Product Agent prompt 添加 search_spu_products 工具说明
## 问题
搜索商品时返回错误的工具调用 search_products 而非 search_spu_products

## 根本原因
Product Agent 的 PRODUCT_AGENT_PROMPT 中没有列出 search_spu_products 工具,
导致 LLM 不知道可以使用 Mall API 的 SPU 搜索工具

## 修改内容

### agent/agents/product.py
- 将 search_spu_products 设为第一个工具(推荐使用)
- 说明此工具使用 Mall API 搜索商品 SPU,支持用户 token 认证,返回卡片格式展示
- 原有的 search_products 标记为高级搜索工具(使用 Hyperf API)
- 调整工具序号 1-6

### docs/PRODUCT_SEARCH_SERVICE.md
- 添加 Product Agent Prompt 更新说明章节
- 调整章节序号

## 预期效果
LLM 现在应该优先使用 search_spu_products 工具进行商品搜索,
返回 Mall API 的商品数据并以 Chatwoot cards 格式展示

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-26 17:50:29 +08:00

321 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Product Agent - Handles product search, recommendations, and quotes
"""
import json
from typing import Any
from core.state import AgentState, ConversationState, add_tool_call, set_response, update_context
from core.llm import get_llm_client, Message
from utils.logger import get_logger
logger = get_logger(__name__)
PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。
你的职责是帮助用户找到合适的商品,包括:
- 商品搜索
- 智能推荐
- B2B 询价
- 库存查询
- 商品详情
## 可用工具
1. **search_spu_products** - 搜索商品(使用 Mall API推荐
- keyword: 搜索关键词(商品名称、编号等)
- page_size: 每页数量(默认 60最大 100
- page: 页码(默认 1
- 说明:此工具使用 Mall API 搜索商品 SPU支持用户 token 认证,返回卡片格式展示
2. **search_products** - 搜索商品(使用 Hyperf API
- query: 搜索关键词
- filters: 过滤条件category, price_range, brand 等)
- sort: 排序方式price_asc/price_desc/sales/latest
- page: 页码
- page_size: 每页数量
- 说明:此工具用于高级搜索,支持多维度过滤
3. **get_product_detail** - 获取商品详情
- product_id: 商品ID
4. **recommend_products** - 智能推荐
- context: 推荐上下文(可包含当前查询、浏览历史等)
- limit: 推荐数量
5. **get_quote** - B2B 询价
- product_id: 商品ID
- quantity: 采购数量
- delivery_address: 收货地址(可选,用于计算运费)
6. **check_inventory** - 库存查询
- product_ids: 商品ID列表
- warehouse: 仓库(可选)
## 工具调用格式
当需要使用工具时,请返回 JSON 格式:
```json
{
"action": "call_tool",
"tool_name": "工具名称",
"arguments": {
"参数名": "参数值"
}
}
```
当需要向用户询问更多信息时:
```json
{
"action": "ask_info",
"question": "需要询问的问题"
}
```
当可以直接回答时:
```json
{
"action": "respond",
"response": "回复内容"
}
```
## B2B 询价特点
- 大批量采购通常有阶梯价格
- 可能需要考虑运费
- 企业客户可能有专属折扣
- 报价通常有有效期
## 商品推荐策略
- 根据用户采购历史推荐
- 根据当前查询语义推荐
- 根据企业行业特点推荐
- 根据季节性和热门商品推荐
## 注意事项
- 帮助用户准确描述需求
- 如果搜索结果太多,建议用户缩小范围
- 询价时确认数量,因为会影响价格
- 库存紧张时及时告知用户
"""
async def product_agent(state: AgentState) -> AgentState:
"""Product agent node
Handles product search, recommendations, quotes and inventory queries.
Args:
state: Current agent state
Returns:
Updated state with tool calls or response
"""
logger.info(
"Product agent processing",
conversation_id=state["conversation_id"],
sub_intent=state.get("sub_intent")
)
state["current_agent"] = "product"
state["agent_history"].append("product")
state["state"] = ConversationState.PROCESSING.value
# Check if we have tool results to process
if state["tool_results"]:
return await _generate_product_response(state)
# Build messages for LLM
messages = [
Message(role="system", content=PRODUCT_AGENT_PROMPT),
]
# Add conversation history
for msg in state["messages"][-6:]:
messages.append(Message(role=msg["role"], content=msg["content"]))
# Build context info
context_info = f"用户ID: {state['user_id']}\n账户ID: {state['account_id']}\n"
if state["entities"]:
context_info += f"已提取的信息: {json.dumps(state['entities'], ensure_ascii=False)}\n"
if state["context"].get("product_id"):
context_info += f"当前讨论的商品ID: {state['context']['product_id']}\n"
if state["context"].get("recent_searches"):
context_info += f"最近搜索: {state['context']['recent_searches']}\n"
user_content = f"{context_info}\n用户消息: {state['current_message']}"
messages.append(Message(role="user", content=user_content))
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
# Parse response
content = response.content.strip()
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
result = json.loads(content)
action = result.get("action")
if action == "call_tool":
arguments = result.get("arguments", {})
# Inject context for recommendation
if result["tool_name"] == "recommend_products":
arguments["user_id"] = state["user_id"]
arguments["account_id"] = state["account_id"]
# Inject context for quote
if result["tool_name"] == "get_quote":
arguments["account_id"] = state["account_id"]
# Use entity if available
if "product_id" not in arguments and state["entities"].get("product_id"):
arguments["product_id"] = state["entities"]["product_id"]
if "quantity" not in arguments and state["entities"].get("quantity"):
arguments["quantity"] = state["entities"]["quantity"]
state = add_tool_call(
state,
tool_name=result["tool_name"],
arguments=arguments,
server="product"
)
state["state"] = ConversationState.TOOL_CALLING.value
elif action == "ask_info":
state = set_response(state, result["question"])
state["state"] = ConversationState.AWAITING_INFO.value
elif action == "respond":
state = set_response(state, result["response"])
state["state"] = ConversationState.GENERATING.value
return state
except json.JSONDecodeError:
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Product agent failed", error=str(e))
state["error"] = str(e)
return state
async def _generate_product_response(state: AgentState) -> AgentState:
"""Generate response based on product tool results"""
# 特殊处理:如果是 search_spu_products 工具返回,直接发送商品卡片
has_spu_search_result = False
spu_products = []
for result in state["tool_results"]:
if result["success"] and result["tool_name"] == "search_spu_products":
data = result["data"]
if isinstance(data, dict) and data.get("success"):
spu_products = data.get("products", [])
has_spu_search_result = True
logger.info(
"SPU product search results found",
products_count=len(spu_products),
keyword=data.get("keyword", "")
)
break
# 如果有 SPU 搜索结果,直接发送商品卡片
if has_spu_search_result and spu_products:
try:
from integrations.chatwoot import ChatwootClient
from core.language_detector import detect_language
# 检测语言
detected_language = state.get("detected_language", "en")
# 发送商品卡片
chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1)))
conversation_id = state.get("conversation_id")
if conversation_id:
await chatwoot.send_product_cards(
conversation_id=int(conversation_id),
products=spu_products,
language=detected_language
)
logger.info(
"Product cards sent successfully",
conversation_id=conversation_id,
products_count=len(spu_products),
language=detected_language
)
# 清空响应,避免重复发送
state = set_response(state, "")
state["state"] = ConversationState.GENERATING.value
return state
except Exception as e:
logger.error(
"Failed to send product cards, falling back to text response",
error=str(e),
products_count=len(spu_products)
)
# 常规处理:生成文本响应
tool_context = []
for result in state["tool_results"]:
if result["success"]:
data = result["data"]
tool_context.append(f"工具 {result['tool_name']} 返回:\n{json.dumps(data, ensure_ascii=False, indent=2)}")
# Extract product context
if isinstance(data, dict):
if data.get("product_id"):
state = update_context(state, {"product_id": data["product_id"]})
if data.get("products"):
# Store recent search results
product_ids = [p.get("product_id") for p in data["products"][:5]]
state = update_context(state, {"recent_product_ids": product_ids})
else:
tool_context.append(f"工具 {result['tool_name']} 执行失败: {result['error']}")
prompt = f"""基于以下商品系统返回的信息,生成对用户的回复。
用户问题: {state["current_message"]}
系统返回信息:
{chr(10).join(tool_context)}
请生成一个清晰、有帮助的回复:
- 如果是搜索结果,展示商品名称、价格、规格等关键信息
- 如果是询价结果,清晰说明单价、总价、折扣、有效期等
- 如果是推荐商品,简要说明推荐理由
- 如果是库存查询,告知可用数量和发货时间
- 结果较多时可以总结关键信息
只返回回复内容,不要返回 JSON。"""
messages = [
Message(role="system", content="你是一个专业的商品顾问,请根据系统返回的信息回答用户的商品问题。"),
Message(role="user", content=prompt)
]
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Product response generation failed", error=str(e))
state = set_response(state, "抱歉,处理商品信息时遇到问题。请稍后重试或联系人工客服。")
return state