Files
assistant/agent/agents/product.py
wangliang e58c3f0caf fix: 修复 Product Agent LLM 响应格式解析和工具选择问题
## 问题 1: LLM 返回非标准 JSON 格式

**现象**:
LLM 返回:`search_products\n{"query": "ring"}`
期望格式:`{"action": "call_tool", "tool_name": "...", "arguments": {...}}`

**原因**:
LLM 有时会返回简化格式 `tool_name\n{args}`,导致 JSON 解析失败

**解决方案**:
添加格式兼容逻辑(第 172-191 行):
- 检测 `\n` 分隔的格式
- 解析工具名和参数
- 转换为标准 JSON 结构

## 问题 2: LLM 选择错误的搜索工具

**现象**:
LLM 选择 `search_products`(Hyperf API)而非 `search_spu_products`(Mall API)

**原因**:
Prompt 中工具说明不够突出,LLM 优先选择第一个工具

**解决方案**:
1. 在 prompt 开头添加醒目警告(第 22-29 行):
   - ⚠️ 强调必须使用 `search_spu_products`
   - 标注适用场景
   - 添加  标记推荐工具

2. 添加具体示例(第 78-89 行):
   - 展示正确的工具调用格式
   - 示例:搜索 "ring" 应使用 `search_spu_products`

## 修改内容

### agent/agents/product.py:172-191
添加非标准格式兼容逻辑

### agent/agents/product.py:14-105
重写 PRODUCT_AGENT_PROMPT:
- 开头添加工具选择警告
- 突出 `search_spu_products` 优先级
- 添加具体使用示例
- 标注各工具适用场景

## 预期效果
1. 兼容 LLM 的简化格式输出
2. LLM 优先选择 `search_spu_products` 进行商品搜索
3. 返回 Mall API 数据并以 Chatwoot cards 展示

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-26 18:17:37 +08:00

383 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Product Agent - Handles product search, recommendations, and quotes
"""
import json
from typing import Any
from core.state import AgentState, ConversationState, add_tool_call, set_response, update_context
from core.llm import get_llm_client, Message
from utils.logger import get_logger
logger = get_logger(__name__)
PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。
你的职责是帮助用户找到合适的商品,包括:
- 商品搜索
- 智能推荐
- B2B 询价
- 库存查询
- 商品详情
## ⚠️ 重要:商品搜索工具选择
**商品搜索必须优先使用 `search_spu_products` 工具!**
- ✅ **search_spu_products**:使用 Mall API支持用户认证返回精美卡片展示推荐
- ⚠️ **search_products**:仅用于高级搜索(需要复杂过滤条件时)
**普通商品搜索(如 "ring""手机""iPhone")必须使用 `search_spu_products`**
## 可用工具
1. **search_spu_products** - 搜索商品(使用 Mall API推荐
- keyword: 搜索关键词(商品名称、编号等)
- page_size: 每页数量(默认 60最大 100
- page: 页码(默认 1
- 说明:此工具使用 Mall API 搜索商品 SPU支持用户 token 认证,返回卡片格式展示
- **适用于所有普通商品搜索请求**
2. **search_products** - 搜索商品(使用 Hyperf API
- query: 搜索关键词
- filters: 过滤条件category, price_range, brand 等)
- sort: 排序方式price_asc/price_desc/sales/latest
- page: 页码
- page_size: 每页数量
- 说明:此工具用于高级搜索,支持多维度过滤
- **仅在需要复杂过滤条件时使用**
3. **get_product_detail** - 获取商品详情
- product_id: 商品ID
4. **recommend_products** - 智能推荐
- context: 推荐上下文(可包含当前查询、浏览历史等)
- limit: 推荐数量
5. **get_quote** - B2B 询价
- product_id: 商品ID
- quantity: 采购数量
- delivery_address: 收货地址(可选,用于计算运费)
6. **check_inventory** - 库存查询
- product_ids: 商品ID列表
- warehouse: 仓库(可选)
## 工具调用格式
当需要使用工具时,请返回 JSON 格式:
```json
{
"action": "call_tool",
"tool_name": "工具名称",
"arguments": {
"参数名": "参数值"
}
}
```
**示例**
用户说:"搜索 ring"
返回:
```json
{
"action": "call_tool",
"tool_name": "search_spu_products",
"arguments": {
"keyword": "ring"
}
}
```
当需要向用户询问更多信息时:
```json
{
"action": "ask_info",
"question": "需要询问的问题"
}
```
当可以直接回答时:
```json
{
"action": "respond",
"response": "回复内容"
}
```
## B2B 询价特点
- 大批量采购通常有阶梯价格
- 可能需要考虑运费
- 企业客户可能有专属折扣
- 报价通常有有效期
## 商品推荐策略
- 根据用户采购历史推荐
- 根据当前查询语义推荐
- 根据企业行业特点推荐
- 根据季节性和热门商品推荐
## 注意事项
- 帮助用户准确描述需求
- 如果搜索结果太多,建议用户缩小范围
- 询价时确认数量,因为会影响价格
- 库存紧张时及时告知用户
"""
async def product_agent(state: AgentState) -> AgentState:
"""Product agent node
Handles product search, recommendations, quotes and inventory queries.
Args:
state: Current agent state
Returns:
Updated state with tool calls or response
"""
logger.info(
"Product agent processing",
conversation_id=state["conversation_id"],
sub_intent=state.get("sub_intent")
)
state["current_agent"] = "product"
state["agent_history"].append("product")
state["state"] = ConversationState.PROCESSING.value
# Check if we have tool results to process
if state["tool_results"]:
return await _generate_product_response(state)
# Build messages for LLM
messages = [
Message(role="system", content=PRODUCT_AGENT_PROMPT),
]
# Add conversation history
for msg in state["messages"][-6:]:
messages.append(Message(role=msg["role"], content=msg["content"]))
# Build context info
context_info = f"用户ID: {state['user_id']}\n账户ID: {state['account_id']}\n"
if state["entities"]:
context_info += f"已提取的信息: {json.dumps(state['entities'], ensure_ascii=False)}\n"
if state["context"].get("product_id"):
context_info += f"当前讨论的商品ID: {state['context']['product_id']}\n"
if state["context"].get("recent_searches"):
context_info += f"最近搜索: {state['context']['recent_searches']}\n"
user_content = f"{context_info}\n用户消息: {state['current_message']}"
messages.append(Message(role="user", content=user_content))
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
# Parse response
content = response.content.strip()
# Log raw LLM response for debugging
logger.info(
"Product agent LLM response",
response_length=len(content),
response_preview=content[:200],
conversation_id=state["conversation_id"]
)
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
# Handle non-JSON format: "tool_name\n{args}"
if '\n' in content and not content.startswith('{'):
lines = content.split('\n', 1)
tool_name = lines[0].strip()
args_json = lines[1].strip() if len(lines) > 1 else '{}'
try:
arguments = json.loads(args_json) if args_json else {}
result = {
"action": "call_tool",
"tool_name": tool_name,
"arguments": arguments
}
except json.JSONDecodeError:
# If args parsing fails, use empty dict
result = {
"action": "call_tool",
"tool_name": tool_name,
"arguments": {}
}
else:
# Standard JSON format
result = json.loads(content)
action = result.get("action")
if action == "call_tool":
arguments = result.get("arguments", {})
# Inject context for SPU product search (Mall API)
if result["tool_name"] == "search_spu_products":
arguments["user_token"] = state.get("user_token")
arguments["user_id"] = state["user_id"]
arguments["account_id"] = state["account_id"]
# Inject context for recommendation
if result["tool_name"] == "recommend_products":
arguments["user_id"] = state["user_id"]
arguments["account_id"] = state["account_id"]
# Inject context for quote
if result["tool_name"] == "get_quote":
arguments["account_id"] = state["account_id"]
# Use entity if available
if "product_id" not in arguments and state["entities"].get("product_id"):
arguments["product_id"] = state["entities"]["product_id"]
if "quantity" not in arguments and state["entities"].get("quantity"):
arguments["quantity"] = state["entities"]["quantity"]
state = add_tool_call(
state,
tool_name=result["tool_name"],
arguments=arguments,
server="product"
)
state["state"] = ConversationState.TOOL_CALLING.value
elif action == "ask_info":
state = set_response(state, result["question"])
state["state"] = ConversationState.AWAITING_INFO.value
elif action == "respond":
state = set_response(state, result["response"])
state["state"] = ConversationState.GENERATING.value
return state
except json.JSONDecodeError:
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Product agent failed", error=str(e))
state["error"] = str(e)
return state
async def _generate_product_response(state: AgentState) -> AgentState:
"""Generate response based on product tool results"""
# 特殊处理:如果是 search_spu_products 工具返回,直接发送商品卡片
has_spu_search_result = False
spu_products = []
for result in state["tool_results"]:
if result["success"] and result["tool_name"] == "search_spu_products":
data = result["data"]
if isinstance(data, dict) and data.get("success"):
spu_products = data.get("products", [])
has_spu_search_result = True
logger.info(
"SPU product search results found",
products_count=len(spu_products),
keyword=data.get("keyword", "")
)
break
# 如果有 SPU 搜索结果,直接发送商品卡片
if has_spu_search_result and spu_products:
try:
from integrations.chatwoot import ChatwootClient
from core.language_detector import detect_language
# 检测语言
detected_language = state.get("detected_language", "en")
# 发送商品卡片
chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1)))
conversation_id = state.get("conversation_id")
if conversation_id:
await chatwoot.send_product_cards(
conversation_id=int(conversation_id),
products=spu_products,
language=detected_language
)
logger.info(
"Product cards sent successfully",
conversation_id=conversation_id,
products_count=len(spu_products),
language=detected_language
)
# 清空响应,避免重复发送
state = set_response(state, "")
state["state"] = ConversationState.GENERATING.value
return state
except Exception as e:
logger.error(
"Failed to send product cards, falling back to text response",
error=str(e),
products_count=len(spu_products)
)
# 常规处理:生成文本响应
tool_context = []
for result in state["tool_results"]:
if result["success"]:
data = result["data"]
tool_context.append(f"工具 {result['tool_name']} 返回:\n{json.dumps(data, ensure_ascii=False, indent=2)}")
# Extract product context
if isinstance(data, dict):
if data.get("product_id"):
state = update_context(state, {"product_id": data["product_id"]})
if data.get("products"):
# Store recent search results
product_ids = [p.get("product_id") for p in data["products"][:5]]
state = update_context(state, {"recent_product_ids": product_ids})
else:
tool_context.append(f"工具 {result['tool_name']} 执行失败: {result['error']}")
prompt = f"""基于以下商品系统返回的信息,生成对用户的回复。
用户问题: {state["current_message"]}
系统返回信息:
{chr(10).join(tool_context)}
请生成一个清晰、有帮助的回复:
- 如果是搜索结果,展示商品名称、价格、规格等关键信息
- 如果是询价结果,清晰说明单价、总价、折扣、有效期等
- 如果是推荐商品,简要说明推荐理由
- 如果是库存查询,告知可用数量和发货时间
- 结果较多时可以总结关键信息
只返回回复内容,不要返回 JSON。"""
messages = [
Message(role="system", content="你是一个专业的商品顾问,请根据系统返回的信息回答用户的商品问题。"),
Message(role="user", content=prompt)
]
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Product response generation failed", error=str(e))
state = set_response(state, "抱歉,处理商品信息时遇到问题。请稍后重试或联系人工客服。")
return state