Files
assistant/agent/agents/product.py
wangliang 54eefba6f8 fix: 修复 JSON 解析导致的 tool_name 丢失问题
## 问题
商品搜索时工具名丢失,导致 404 错误:
```
HTTP Request: POST http://product_mcp:8004/tools/ "HTTP/1.1 404 Not Found"
```

URL 应该是 `/tools/search_products` 但实际是 `/tools/`(工具名丢失)

## 根本原因
当 LLM 返回带 ```json``` 代码块格式的 JSON 时:

```
```json
{
  "action": "call_tool",
  "tool_name": "search_products",
  "arguments": {"keyword": "ring"}
}
```
```

解析逻辑处理后:
1. 移除 ```` → 得到 `json\n{\n...`
2. 移除 `json` → 得到 `\n{\n...`
3. 内容以换行符开头,不是 `{`
4. 被误判为非 JSON 格式(`tool_name\n{args}`)
5. 按换行符分割,第一行为空 → `tool_name = ""`

## 解决方案
**第 189 行**:添加 `content.strip()` 去除前后空白

```python
if content.startswith("```"):
    content = content.split("```")[1]
    if content.startswith("json"):
        content = content[4:]
    # Remove leading/trailing whitespace after removing code block markers
    content = content.strip()  # ← 新增
```

## 额外改进
**第 217-224 行**:添加工具调用日志

```python
logger.info(
    "Product agent calling tool",
    tool_name=tool_name,
    arguments=arguments,
    conversation_id=state["conversation_id"]
)
```

便于调试工具调用问题。

## 测试验证

修复前:
```
tool_name = ""  (空字符串)
URL: /tools/     (缺少工具名)
```

修复后:
```
tool_name = "search_products"  (正确)
URL: /tools/search_products     (完整路径)
```

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-26 18:36:27 +08:00

394 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Product Agent - Handles product search, recommendations, and quotes
"""
import json
from typing import Any
from core.state import AgentState, ConversationState, add_tool_call, set_response, update_context
from core.llm import get_llm_client, Message
from utils.logger import get_logger
logger = get_logger(__name__)
PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。
你的职责是帮助用户找到合适的商品,包括:
- 商品搜索
- 智能推荐
- B2B 询价
- 库存查询
- 商品详情
## 可用工具
1. **search_products** - 搜索商品
- keyword: 搜索关键词(商品名称、编号等)
- page_size: 每页数量(默认 60最大 100
- page: 页码(默认 1
- 说明:此工具使用 Mall API 搜索商品 SPU支持用户 token 认证,返回卡片格式展示
2. **get_product_detail** - 获取商品详情
- product_id: 商品ID
3. **recommend_products** - 智能推荐
- context: 推荐上下文(可包含当前查询、浏览历史等)
- limit: 推荐数量
4. **get_quote** - B2B 询价
- product_id: 商品ID
- quantity: 采购数量
- delivery_address: 收货地址(可选,用于计算运费)
5. **check_inventory** - 库存查询
- product_ids: 商品ID列表
- warehouse: 仓库(可选)
## 工具调用格式
当需要使用工具时,请返回 JSON 格式:
```json
{
"action": "call_tool",
"tool_name": "工具名称",
"arguments": {
"参数名": "参数值"
}
}
```
**示例**
用户说:"搜索 ring"
返回:
```json
{
"action": "call_tool",
"tool_name": "search_products",
"arguments": {
"keyword": "ring"
}
}
```
用户说:"查找手机"
返回:
```json
{
"action": "call_tool",
"tool_name": "search_products",
"arguments": {
"keyword": "手机"
}
}
```
当需要向用户询问更多信息时:
```json
{
"action": "ask_info",
"question": "需要询问的问题"
}
```
当可以直接回答时:
```json
{
"action": "respond",
"response": "回复内容"
}
```
## B2B 询价特点
- 大批量采购通常有阶梯价格
- 可能需要考虑运费
- 企业客户可能有专属折扣
- 报价通常有有效期
## 商品推荐策略
- 根据用户采购历史推荐
- 根据当前查询语义推荐
- 根据企业行业特点推荐
- 根据季节性和热门商品推荐
## 注意事项
- 帮助用户准确描述需求
- 如果搜索结果太多,建议用户缩小范围
- 询价时确认数量,因为会影响价格
- 库存紧张时及时告知用户
"""
async def product_agent(state: AgentState) -> AgentState:
"""Product agent node
Handles product search, recommendations, quotes and inventory queries.
Args:
state: Current agent state
Returns:
Updated state with tool calls or response
"""
logger.info(
"Product agent processing",
conversation_id=state["conversation_id"],
sub_intent=state.get("sub_intent")
)
state["current_agent"] = "product"
state["agent_history"].append("product")
state["state"] = ConversationState.PROCESSING.value
# Check if we have tool results to process
if state["tool_results"]:
return await _generate_product_response(state)
# Build messages for LLM
messages = [
Message(role="system", content=PRODUCT_AGENT_PROMPT),
]
# Add conversation history
for msg in state["messages"][-6:]:
messages.append(Message(role=msg["role"], content=msg["content"]))
# Build context info
context_info = f"用户ID: {state['user_id']}\n账户ID: {state['account_id']}\n"
if state["entities"]:
context_info += f"已提取的信息: {json.dumps(state['entities'], ensure_ascii=False)}\n"
if state["context"].get("product_id"):
context_info += f"当前讨论的商品ID: {state['context']['product_id']}\n"
if state["context"].get("recent_searches"):
context_info += f"最近搜索: {state['context']['recent_searches']}\n"
user_content = f"{context_info}\n用户消息: {state['current_message']}"
messages.append(Message(role="user", content=user_content))
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
# Parse response
content = response.content.strip()
# Log raw LLM response for debugging
logger.info(
"Product agent LLM response",
response_length=len(content),
response_preview=content[:200],
conversation_id=state["conversation_id"]
)
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
# Remove leading/trailing whitespace after removing code block markers
content = content.strip()
# Handle non-JSON format: "tool_name\n{args}"
if '\n' in content and not content.startswith('{'):
lines = content.split('\n', 1)
tool_name = lines[0].strip()
args_json = lines[1].strip() if len(lines) > 1 else '{}'
try:
arguments = json.loads(args_json) if args_json else {}
result = {
"action": "call_tool",
"tool_name": tool_name,
"arguments": arguments
}
except json.JSONDecodeError:
# If args parsing fails, use empty dict
result = {
"action": "call_tool",
"tool_name": tool_name,
"arguments": {}
}
else:
# Standard JSON format
result = json.loads(content)
action = result.get("action")
if action == "call_tool":
arguments = result.get("arguments", {})
tool_name = result.get("tool_name", "")
logger.info(
"Product agent calling tool",
tool_name=tool_name,
arguments=arguments,
conversation_id=state["conversation_id"]
)
# Inject context for product search (Mall API)
if tool_name == "search_products":
arguments["user_token"] = state.get("user_token")
arguments["user_id"] = state["user_id"]
arguments["account_id"] = state["account_id"]
# Map "query" parameter to "keyword" for compatibility
if "query" in arguments and "keyword" not in arguments:
arguments["keyword"] = arguments.pop("query")
logger.info(
"Parameter mapped: query -> keyword",
conversation_id=state["conversation_id"]
)
# Inject context for recommendation
if tool_name == "recommend_products":
arguments["user_id"] = state["user_id"]
arguments["account_id"] = state["account_id"]
# Inject context for quote
if tool_name == "get_quote":
arguments["account_id"] = state["account_id"]
# Use entity if available
if "product_id" not in arguments and state["entities"].get("product_id"):
arguments["product_id"] = state["entities"]["product_id"]
if "quantity" not in arguments and state["entities"].get("quantity"):
arguments["quantity"] = state["entities"]["quantity"]
state = add_tool_call(
state,
tool_name=tool_name,
arguments=arguments,
server="product"
)
state["state"] = ConversationState.TOOL_CALLING.value
elif action == "ask_info":
state = set_response(state, result["question"])
state["state"] = ConversationState.AWAITING_INFO.value
elif action == "respond":
state = set_response(state, result["response"])
state["state"] = ConversationState.GENERATING.value
return state
except json.JSONDecodeError:
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Product agent failed", error=str(e))
state["error"] = str(e)
return state
async def _generate_product_response(state: AgentState) -> AgentState:
"""Generate response based on product tool results"""
# 特殊处理:如果是 search_products 工具返回,直接发送商品卡片
has_product_search_result = False
products = []
for result in state["tool_results"]:
if result["success"] and result["tool_name"] == "search_products":
data = result["data"]
if isinstance(data, dict) and data.get("success"):
products = data.get("products", [])
has_product_search_result = True
logger.info(
"Product search results found",
products_count=len(products),
keyword=data.get("keyword", "")
)
break
# 如果有商品搜索结果,直接发送商品卡片
if has_product_search_result and products:
try:
from integrations.chatwoot import ChatwootClient
from core.language_detector import detect_language
# 检测语言
detected_language = state.get("detected_language", "en")
# 发送商品卡片
chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1)))
conversation_id = state.get("conversation_id")
if conversation_id:
await chatwoot.send_product_cards(
conversation_id=int(conversation_id),
products=products,
language=detected_language
)
logger.info(
"Product cards sent successfully",
conversation_id=conversation_id,
products_count=len(products),
language=detected_language
)
# 清空响应,避免重复发送
state = set_response(state, "")
state["state"] = ConversationState.GENERATING.value
return state
except Exception as e:
logger.error(
"Failed to send product cards, falling back to text response",
error=str(e),
products_count=len(products)
)
# 常规处理:生成文本响应
tool_context = []
for result in state["tool_results"]:
if result["success"]:
data = result["data"]
tool_context.append(f"工具 {result['tool_name']} 返回:\n{json.dumps(data, ensure_ascii=False, indent=2)}")
# Extract product context
if isinstance(data, dict):
if data.get("product_id"):
state = update_context(state, {"product_id": data["product_id"]})
if data.get("products"):
# Store recent search results
product_ids = [p.get("product_id") for p in data["products"][:5]]
state = update_context(state, {"recent_product_ids": product_ids})
else:
tool_context.append(f"工具 {result['tool_name']} 执行失败: {result['error']}")
prompt = f"""基于以下商品系统返回的信息,生成对用户的回复。
用户问题: {state["current_message"]}
系统返回信息:
{chr(10).join(tool_context)}
请生成一个清晰、有帮助的回复:
- 如果是搜索结果,展示商品名称、价格、规格等关键信息
- 如果是询价结果,清晰说明单价、总价、折扣、有效期等
- 如果是推荐商品,简要说明推荐理由
- 如果是库存查询,告知可用数量和发货时间
- 结果较多时可以总结关键信息
只返回回复内容,不要返回 JSON。"""
messages = [
Message(role="system", content="你是一个专业的商品顾问,请根据系统返回的信息回答用户的商品问题。"),
Message(role="user", content=prompt)
]
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Product response generation failed", error=str(e))
state = set_response(state, "抱歉,处理商品信息时遇到问题。请稍后重试或联系人工客服。")
return state