Files
assistant/agent/integrations/chatwoot.py
wangliang e8e89601a5 feat: 修复订单查询和物流查询功能
主要修改:

1. 订单数据解析修复 (agent/agents/order.py)
   - 修复 Mall API 返回数据的嵌套结构解析
   - 更新字段映射:orderId→order_id, orderProduct→items, statusText→status_text
   - 支持多种商品图片字段:image, pic, thumb, productImg
   - 添加详细的调试日志

2. 物流查询修复 (mcp_servers/order_mcp/server.py)
   - 修复物流接口返回数据结构解析 (data[].trackingCode→tracking_number)
   - 添加 print() 日志用于调试
   - 支持多种字段名映射

3. Chatwoot 集成优化 (agent/integrations/chatwoot.py)
   - 添加 json 模块导入
   - 完善订单卡片和表单展示功能

4. API 请求头优化 (mcp_servers/shared/mall_client.py)
   - 更新 User-Agent 和 Accept 头
   - 修正 Origin 和 Referer 大小写

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-20 19:10:21 +08:00

748 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Chatwoot API Client for B2B Shopping AI Assistant
"""
import json
from typing import Any, Optional
from dataclasses import dataclass
from enum import Enum
import httpx
from config import settings
from utils.logger import get_logger
logger = get_logger(__name__)
class MessageType(str, Enum):
"""Chatwoot message types"""
INCOMING = "incoming"
OUTGOING = "outgoing"
class ConversationStatus(str, Enum):
"""Chatwoot conversation status"""
OPEN = "open"
RESOLVED = "resolved"
PENDING = "pending"
SNOOZED = "snoozed"
@dataclass
class ChatwootMessage:
"""Chatwoot message structure"""
id: int
content: str
message_type: str
conversation_id: int
sender_type: Optional[str] = None
sender_id: Optional[int] = None
private: bool = False
@dataclass
class ChatwootContact:
"""Chatwoot contact structure"""
id: int
name: Optional[str] = None
email: Optional[str] = None
phone_number: Optional[str] = None
custom_attributes: Optional[dict[str, Any]] = None
class ChatwootClient:
"""Chatwoot API Client"""
def __init__(
self,
api_url: Optional[str] = None,
api_token: Optional[str] = None,
account_id: int = 2
):
"""Initialize Chatwoot client
Args:
api_url: Chatwoot API URL, defaults to settings
api_token: API access token, defaults to settings
account_id: Chatwoot account ID
"""
self.api_url = (api_url or settings.chatwoot_api_url).rstrip("/")
self.api_token = api_token or settings.chatwoot_api_token
self.account_id = account_id
self._client: Optional[httpx.AsyncClient] = None
logger.info("Chatwoot client initialized", api_url=self.api_url)
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client"""
if self._client is None:
self._client = httpx.AsyncClient(
base_url=f"{self.api_url}/api/v1/accounts/{self.account_id}",
headers={
"api_access_token": self.api_token,
"Content-Type": "application/json"
},
timeout=30.0
)
return self._client
async def close(self) -> None:
"""Close HTTP client"""
if self._client:
await self._client.aclose()
self._client = None
# ============ Messages ============
async def send_message(
self,
conversation_id: int,
content: str,
message_type: MessageType = MessageType.OUTGOING,
private: bool = False
) -> dict[str, Any]:
"""Send a message to a conversation
Args:
conversation_id: Conversation ID
content: Message content
message_type: Message type (incoming/outgoing)
private: Whether message is private (internal note)
Returns:
Created message data
"""
client = await self._get_client()
payload = {
"content": content,
"message_type": message_type.value,
"private": private
}
response = await client.post(
f"/conversations/{conversation_id}/messages",
json=payload
)
response.raise_for_status()
data = response.json()
logger.debug(
"Message sent",
conversation_id=conversation_id,
message_id=data.get("id")
)
return data
async def send_rich_message(
self,
conversation_id: int,
content: str,
content_type: str,
content_attributes: dict[str, Any]
) -> dict[str, Any]:
"""Send a rich message (cards, buttons, etc.)
Args:
conversation_id: Conversation ID
content: Fallback text content
content_type: Rich content type (cards, input_select, etc.)
content_attributes: Rich content attributes
Returns:
Created message data
"""
client = await self._get_client()
payload = {
"content": content,
"message_type": MessageType.OUTGOING.value,
"content_type": content_type,
"content_attributes": content_attributes
}
response = await client.post(
f"/conversations/{conversation_id}/messages",
json=payload
)
response.raise_for_status()
return response.json()
async def send_order_card(
self,
conversation_id: int,
order_data: dict[str, Any],
actions: Optional[list[dict[str, Any]]] = None
) -> dict[str, Any]:
"""发送订单卡片消息(使用 Chatwoot cards 格式)
卡片结构:
- 图片:第一件商品的图片
- 标题:订单号 + 状态
- 描述:汇总信息
- 按钮:跳转到订单详情页面
Args:
conversation_id: 会话 ID
order_data: 订单数据,包含:
- order_id: 订单号
- status: 订单状态
- status_text: 状态文本
- items: 商品列表
- total_amount: 总金额
actions: 操作按钮配置列表(可选)
Returns:
发送结果
Example:
>>> order_data = {
... "order_id": "202071324",
... "status_text": "已发货",
... "items": [{"name": "商品A", "quantity": 2}],
... "total_amount": "599.00"
... }
>>> await chatwoot.send_order_card(123, order_data)
"""
order_id = order_data.get("order_id", "")
status_text = order_data.get("status_text", order_data.get("status", ""))
# 获取第一件商品的图片
items = order_data.get("items", [])
media_url = None
if items and len(items) > 0:
first_item = items[0]
media_url = first_item.get("image_url")
# 构建标题
title = f"订单 #{order_id} {status_text}"
# 构建描述
if items and len(items) > 0:
if len(items) == 1:
items_desc = items[0].get("name", "商品")
else:
items_desc = f"{items[0].get('name', '商品A')} 等共计 {len(items)} 件商品"
description = f"包含 {items_desc},实付 ¥{order_data.get('total_amount', '0.00')}"
else:
description = f"实付 ¥{order_data.get('total_amount', '0.00')}"
# 构建操作按钮
card_actions = []
if actions:
card_actions = actions
else:
# 默认按钮:跳转到订单详情页面
card_actions = [
{
"type": "link",
"text": "查看订单详情",
"uri": f"https://www.qa1.gaia888.com/customer/order/detail?orderId={order_id}"
}
]
# 构建单个卡片
card = {
"title": title,
"description": description,
"actions": card_actions
}
# 如果有图片,添加 media_url
if media_url:
card["media_url"] = media_url
# 构建 content_attributes
content_attributes = {
"items": [card]
}
# 记录发送的数据(用于调试)
logger.info(
"Sending order card",
conversation_id=conversation_id,
order_id=order_id,
has_media=bool(media_url),
payload_preview=json.dumps({
"content": "订单详情",
"content_type": "cards",
"content_attributes": content_attributes
}, ensure_ascii=False, indent=2)[:1000]
)
# 发送富媒体消息
return await self.send_rich_message(
conversation_id=conversation_id,
content="订单详情",
content_type="cards",
content_attributes=content_attributes
)
async def send_order_form(
self,
conversation_id: int,
order_data: dict[str, Any],
actions: Optional[list[dict[str, Any]]] = None
) -> dict[str, Any]:
"""发送订单详情表单消息(使用 content_type=form
根据 Chatwoot API 文档实现的 form 格式订单详情展示。
form 类型支持的字段类型text, text_area, email, select
Args:
conversation_id: 会话 ID
order_data: 订单数据,包含:
- order_id: 订单号
- status: 订单状态
- status_text: 状态文本
- created_at: 下单时间(可选)
- items: 商品列表(可选)
- total_amount: 总金额
- shipping_fee: 运费(可选)
- logistics: 物流信息(可选)
- remark: 备注(可选)
actions: 操作按钮配置列表(可选),每个按钮包含:
- label: 按钮文字(用于 select 选项的显示)
- value: 按钮值(用于 select 选项的值)
Returns:
发送结果
Example:
>>> order_data = {
... "order_id": "123456789",
... "status": "shipped",
... "status_text": "已发货",
... "created_at": "2023-10-27 14:30",
... "total_amount": "1058.00",
... "items": [{"name": "商品A", "quantity": 2, "price": "100.00"}]
... }
>>> actions = [
... {"label": "查看详情", "value": "VIEW_DETAILS"},
... {"label": "联系客服", "value": "CONTACT_SUPPORT"}
... ]
>>> await chatwoot.send_order_form(123, order_data, actions)
"""
# 构建表单字段
form_items = []
# 订单号(只读文本)
form_items.append({
"name": "order_id",
"label": "订单号",
"type": "text",
"placeholder": "订单号",
"default": order_data.get("order_id", "")
})
# 订单状态(只读文本)
status_text = order_data.get("status_text", order_data.get("status", "unknown"))
form_items.append({
"name": "status",
"label": "订单状态",
"type": "text",
"placeholder": "订单状态",
"default": status_text
})
# 下单时间(只读文本)
if order_data.get("created_at"):
form_items.append({
"name": "created_at",
"label": "下单时间",
"type": "text",
"placeholder": "下单时间",
"default": order_data["created_at"]
})
# 商品列表(多行文本)
items = order_data.get("items", [])
if items:
items_text = "\n".join([
f"▫️ {item.get('name', '未知商品')} × {item.get('quantity', 1)} - ¥{item.get('price', '0.00')}"
for item in items
])
form_items.append({
"name": "items",
"label": "商品详情",
"type": "text_area",
"placeholder": "商品列表",
"default": items_text
})
# 总金额(只读文本)
form_items.append({
"name": "total_amount",
"label": "总金额",
"type": "text",
"placeholder": "总金额",
"default": f"¥{order_data.get('total_amount', '0.00')}"
})
# 运费(只读文本)
if order_data.get("shipping_fee") is not None:
form_items.append({
"name": "shipping_fee",
"label": "运费",
"type": "text",
"placeholder": "运费",
"default": f"¥{order_data['shipping_fee']}"
})
# 物流信息(多行文本)
logistics = order_data.get("logistics")
if logistics:
logistics_text = (
f"承运商: {logistics.get('carrier', '未知')}\n"
f"单号: {logistics.get('tracking_number', '未知')}"
)
form_items.append({
"name": "logistics",
"label": "物流信息",
"type": "text_area",
"placeholder": "物流信息",
"default": logistics_text
})
# 备注(多行文本)
if order_data.get("remark"):
form_items.append({
"name": "remark",
"label": "备注",
"type": "text_area",
"placeholder": "备注",
"default": order_data["remark"]
})
# 操作选项(下拉选择,如果提供了 actions
if actions:
form_items.append({
"name": "action_select",
"label": "操作",
"type": "select",
"options": actions
})
# 构建 content_attributes
content_attributes = {
"items": form_items
}
# 发送 form 类型消息
return await self.send_rich_message(
conversation_id=conversation_id,
content="订单详情",
content_type="form",
content_attributes=content_attributes
)
# ============ Conversations ============
async def get_conversation(self, conversation_id: int) -> dict[str, Any]:
"""Get conversation details
Args:
conversation_id: Conversation ID
Returns:
Conversation data
"""
client = await self._get_client()
response = await client.get(f"/conversations/{conversation_id}")
response.raise_for_status()
return response.json()
async def update_conversation_status(
self,
conversation_id: int,
status: ConversationStatus
) -> dict[str, Any]:
"""Update conversation status
Args:
conversation_id: Conversation ID
status: New status
Returns:
Updated conversation data
"""
client = await self._get_client()
response = await client.post(
f"/conversations/{conversation_id}/toggle_status",
json={"status": status.value}
)
response.raise_for_status()
logger.info(
"Conversation status updated",
conversation_id=conversation_id,
status=status.value
)
return response.json()
async def add_labels(
self,
conversation_id: int,
labels: list[str]
) -> dict[str, Any]:
"""Add labels to a conversation
Args:
conversation_id: Conversation ID
labels: List of label names
Returns:
Updated labels
"""
client = await self._get_client()
response = await client.post(
f"/conversations/{conversation_id}/labels",
json={"labels": labels}
)
response.raise_for_status()
return response.json()
async def assign_agent(
self,
conversation_id: int,
agent_id: int
) -> dict[str, Any]:
"""Assign an agent to a conversation
Args:
conversation_id: Conversation ID
agent_id: Agent user ID
Returns:
Assignment result
"""
client = await self._get_client()
response = await client.post(
f"/conversations/{conversation_id}/assignments",
json={"assignee_id": agent_id}
)
response.raise_for_status()
logger.info(
"Agent assigned",
conversation_id=conversation_id,
agent_id=agent_id
)
return response.json()
# ============ Contacts ============
async def get_contact(self, contact_id: int) -> dict[str, Any]:
"""Get contact details
Args:
contact_id: Contact ID
Returns:
Contact data
"""
client = await self._get_client()
response = await client.get(f"/contacts/{contact_id}")
response.raise_for_status()
return response.json()
async def update_contact(
self,
contact_id: int,
attributes: dict[str, Any]
) -> dict[str, Any]:
"""Update contact attributes
Args:
contact_id: Contact ID
attributes: Attributes to update
Returns:
Updated contact data
"""
client = await self._get_client()
response = await client.put(
f"/contacts/{contact_id}",
json=attributes
)
response.raise_for_status()
return response.json()
# ============ Messages History ============
async def get_messages(
self,
conversation_id: int,
before: Optional[int] = None
) -> list[dict[str, Any]]:
"""Get conversation messages
Args:
conversation_id: Conversation ID
before: Get messages before this message ID
Returns:
List of messages
"""
client = await self._get_client()
params = {}
if before:
params["before"] = before
response = await client.get(
f"/conversations/{conversation_id}/messages",
params=params
)
response.raise_for_status()
data = response.json()
return data.get("payload", [])
# ============ Helper Functions ============
def format_order_card_markdown(order_data: dict[str, Any]) -> str:
"""格式化订单信息为 Markdown 卡片
Args:
order_data: 订单数据,包含订单号、状态、商品、金额、物流等信息
Returns:
格式化的 Markdown 字符串
Example:
>>> order = {
... "order_id": "123456789",
... "status": "shipped",
... "status_text": "已发货",
... "created_at": "2023-10-27 14:30",
... "items": [...],
... "total_amount": "1058.00",
... "shipping_fee": "0.00",
... "logistics": {...}
... }
>>> markdown = format_order_card_markdown(order)
"""
# 订单状态 emoji 映射
status_emoji = {
"pending": "",
"paid": "💰",
"processing": "⚙️",
"shipped": "📦",
"delivered": "",
"completed": "",
"cancelled": "",
"refunded": "💸",
"failed": "⚠️",
}
# 获取状态文本和 emoji
status = order_data.get("status", "unknown")
status_text = order_data.get("status_text", status)
emoji = status_emoji.get(status, "📦")
lines = [
f"{emoji} **订单状态:{status_text}**",
f"📝 **订单号:** `{order_data.get('order_id', '')}`",
]
# 添加下单时间(如果有)
if order_data.get("created_at"):
lines.append(f"📅 **下单时间:** {order_data['created_at']}")
lines.append("") # 空行
lines.append("**商品详情**")
# 添加商品列表
items = order_data.get("items", [])
if items:
for item in items:
name = item.get("name", "未知商品")
quantity = item.get("quantity", 1)
price = item.get("price", "0.00")
# 可选:添加图片链接
image_markdown = ""
if item.get("image_url"):
image_markdown = f" [图片]({item['image_url']})"
lines.append(f"▫️{image_markdown} {name} × {quantity} ¥{price}")
else:
lines.append("▫️ 无商品信息")
# 添加金额信息
lines.extend([
"",
f"💰 **实付:** ¥{order_data.get('total_amount', '0.00')} (含运费 ¥{order_data.get('shipping_fee', '0.00')})"
])
# 添加物流信息(如果有)
logistics = order_data.get("logistics")
if logistics:
lines.extend([
"",
"🚚 **物流信息**",
f"承运商:{logistics.get('carrier', '未知')}",
f"单号:{logistics.get('tracking_number', '未知')}",
"*点击单号可复制跟踪*"
])
# 添加备注(如果有)
if order_data.get("remark"):
lines.extend([
"",
f"📋 **备注:** {order_data['remark']}"
])
return "\n".join(lines)
def create_action_buttons(actions: list[dict[str, Any]]) -> dict[str, Any]:
"""创建 Chatwoot 操作按钮卡片
Args:
actions: 按钮配置列表,每个按钮包含:
- type: "link""postback"
- text: 按钮文字
- uri: 链接地址type=link 时)
- payload: 回传数据type=postback 时)
Returns:
符合 Chatwoot content_attributes 格式的字典
Example:
>>> actions = [
... {"type": "link", "text": "查看详情", "uri": "https://example.com"},
... {"type": "postback", "text": "联系客服", "payload": "CONTACT_SUPPORT"}
... ]
>>> buttons = create_action_buttons(actions)
"""
return {
"items": [{
"title": "操作",
"actions": actions
}]
}
# Global Chatwoot client instance
chatwoot_client: Optional[ChatwootClient] = None
def get_chatwoot_client() -> ChatwootClient:
"""Get or create global Chatwoot client instance"""
global chatwoot_client
if chatwoot_client is None:
chatwoot_client = ChatwootClient()
return chatwoot_client