Files
assistant/agent/integrations/chatwoot.py
wangliang 0b5d0a8086 feat: 重构订单和物流信息展示格式
主要改动:
- 订单列表:使用 order_list 格式,展示 5 个订单(全部状态)
- 订单详情:使用 order_detail 格式,优化价格和时间显示
- 物流信息:使用 logistics 格式,根据 track id 动态生成步骤
- 商品图片:从 orderProduct.imageUrl 字段获取
- 时间格式:统一为 YYYY-MM-DD HH:MM:SS
- 多语言支持:amountLabel、orderTime 支持中英文
- 配置管理:新增 FRONTEND_URL 环境变量
- API 集成:改进 Mall API tracks 数据解析
- 认证优化:account_id 从 webhook 动态获取

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-23 18:49:40 +08:00

1261 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Chatwoot API Client for B2B Shopping AI Assistant
"""
import json
from typing import Any, Optional
from dataclasses import dataclass
from enum import Enum
import httpx
from config import settings
from utils.logger import get_logger
logger = get_logger(__name__)
# 订单字段多语言映射
ORDER_FIELD_LABELS = {
"zh": { # 中文
"title": "订单详情",
"order_id": "订单号",
"status": "订单状态",
"total_amount": "订单金额",
"payment_method": "支付方式",
"items_count": "商品总数量",
"shipping_method": "运输方式",
"tracking_number": "运单号",
"shipping_address": "收货地址",
"billing_address": "发票地址",
"channel": "渠道",
"created_at": "下单时间",
"action": "操作",
},
"en": { # English
"title": "Order Details",
"order_id": "Order ID",
"status": "Order Status",
"total_amount": "Total Amount",
"payment_method": "Payment Method",
"items_count": "Total Items",
"shipping_method": "Shipping Method",
"tracking_number": "Tracking Number",
"shipping_address": "Shipping Address",
"billing_address": "Billing Address",
"channel": "Channel",
"created_at": "Order Date",
"action": "Action",
},
"nl": { # Dutch (荷兰语)
"title": "Orderdetails",
"order_id": "Ordernummer",
"status": "Orderstatus",
"total_amount": "Totaalbedrag",
"payment_method": "Betaalmethode",
"items_count": "Totaal aantal artikelen",
"shipping_method": "Verzendmethode",
"tracking_number": "Trackingsnummer",
"shipping_address": "Verzendadres",
"billing_address": "Factuuradres",
"channel": "Kanaal",
"created_at": "Besteldatum",
"action": "Actie",
},
"de": { # German (德语)
"title": "Bestelldetails",
"order_id": "Bestellnummer",
"status": "Bestellstatus",
"total_amount": "Gesamtbetrag",
"payment_method": "Zahlungsmethode",
"items_count": "Gesamtanzahl Artikel",
"shipping_method": "Versandart",
"tracking_number": "Sendungsnummer",
"shipping_address": "Lieferadresse",
"billing_address": "Rechnungsadresse",
"channel": "Kanal",
"created_at": "Bestelldatum",
"action": "Aktion",
},
"es": { # Spanish (西班牙语)
"title": "Detalles del pedido",
"order_id": "Número de pedido",
"status": "Estado del pedido",
"total_amount": "Importe total",
"payment_method": "Método de pago",
"items_count": "Total de artículos",
"shipping_method": "Método de envío",
"tracking_number": "Número de seguimiento",
"shipping_address": "Dirección de envío",
"billing_address": "Dirección de facturación",
"channel": "Canal",
"created_at": "Fecha del pedido",
"action": "Acción",
},
"fr": { # French (法语)
"title": "Détails de la commande",
"order_id": "Numéro de commande",
"status": "Statut de la commande",
"total_amount": "Montant total",
"payment_method": "Méthode de paiement",
"items_count": "Nombre total d'articles",
"shipping_method": "Méthode d'expédition",
"tracking_number": "Numéro de suivi",
"shipping_address": "Adresse de livraison",
"billing_address": "Adresse de facturation",
"channel": "Canal",
"created_at": "Date de commande",
"action": "Action",
},
"it": { # Italian (意大利语)
"title": "Detagli dell'ordine",
"order_id": "Numero ordine",
"status": "Stato ordine",
"total_amount": "Importo totale",
"payment_method": "Metodo di pagamento",
"items_count": "Totale articoli",
"shipping_method": "Metodo di spedizione",
"tracking_number": "Numero di tracciamento",
"shipping_address": "Indirizzo di spedizione",
"billing_address": "Indirizzo di fatturazione",
"channel": "Canale",
"created_at": "Data ordine",
"action": "Azione",
},
"tr": { # Turkish (土耳其语)
"title": "Sipariş Detayları",
"order_id": "Sipariş Numarası",
"status": "Sipariş Durumu",
"total_amount": "Toplam Tutar",
"payment_method": "Ödeme Yöntemi",
"items_count": "Toplam Ürün",
"shipping_method": "Kargo Yöntemi",
"tracking_number": "Takip Numarası",
"shipping_address": "Teslimat Adresi",
"billing_address": "Fatura Adresi",
"channel": "Kanal",
"created_at": "Sipariş Tarihi",
"action": "İşlem",
},
}
def get_field_label(field_key: str, language: str = "en") -> str:
"""获取指定语言的字段标签
Args:
field_key: 字段键名(如 "order_id", "status" 等)
language: 语言代码(默认 "en"
Returns:
对应语言的字段标签
"""
if language not in ORDER_FIELD_LABELS:
language = "en" # 默认使用英文
return ORDER_FIELD_LABELS[language].get(field_key, ORDER_FIELD_LABELS["en"].get(field_key, field_key))
class MessageType(str, Enum):
"""Chatwoot message types"""
INCOMING = "incoming"
OUTGOING = "outgoing"
class ConversationStatus(str, Enum):
"""Chatwoot conversation status"""
OPEN = "open"
RESOLVED = "resolved"
PENDING = "pending"
SNOOZED = "snoozed"
@dataclass
class ChatwootMessage:
"""Chatwoot message structure"""
id: int
content: str
message_type: str
conversation_id: int
sender_type: Optional[str] = None
sender_id: Optional[int] = None
private: bool = False
@dataclass
class ChatwootContact:
"""Chatwoot contact structure"""
id: int
name: Optional[str] = None
email: Optional[str] = None
phone_number: Optional[str] = None
custom_attributes: Optional[dict[str, Any]] = None
class ChatwootClient:
"""Chatwoot API Client"""
def __init__(
self,
api_url: Optional[str] = None,
api_token: Optional[str] = None,
account_id: int = 2
):
"""Initialize Chatwoot client
Args:
api_url: Chatwoot API URL, defaults to settings
api_token: API access token, defaults to settings
account_id: Chatwoot account ID
"""
self.api_url = (api_url or settings.chatwoot_api_url).rstrip("/")
self.api_token = api_token or settings.chatwoot_api_token
self.account_id = account_id
self._client: Optional[httpx.AsyncClient] = None
logger.info("Chatwoot client initialized", api_url=self.api_url)
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client"""
if self._client is None:
self._client = httpx.AsyncClient(
base_url=f"{self.api_url}/api/v1/accounts/{self.account_id}",
headers={
"api_access_token": self.api_token,
"Content-Type": "application/json"
},
timeout=30.0
)
return self._client
async def close(self) -> None:
"""Close HTTP client"""
if self._client:
await self._client.aclose()
self._client = None
def _format_datetime(self, datetime_str: str) -> str:
"""格式化日期时间为 YYYY-MM-DD HH:MM:SS 格式
Args:
datetime_str: 输入的日期时间字符串
Returns:
格式化后的日期时间字符串 (YYYY-MM-DD HH:MM:SS)
"""
if not datetime_str:
return ""
# 如果已经是正确的格式YYYY-MM-DD HH:MM:SS直接返回
if len(datetime_str) == 19 and datetime_str[10] == ' ' and datetime_str[13] == ':' and datetime_str[16] == ':':
return datetime_str
# 尝试解析常见的时间格式
from datetime import datetime
# 常见格式列表
formats = [
"%Y-%m-%d %H:%M:%S", # 2025-01-23 14:30:00
"%Y-%m-%dT%H:%M:%S", # 2025-01-23T14:30:00
"%Y-%m-%d %H:%M:%S.%f", # 2025-01-23 14:30:00.123456
"%Y-%m-%dT%H:%M:%S.%f", # 2025-01-23T14:30:00.123456
"%Y-%m-%dT%H:%M:%SZ", # 2025-01-23T14:30:00Z
"%Y-%m-%dT%H:%M:%S.%fZ", # 2025-01-23T14:30:00.123456Z
"%Y/%m/%d %H:%M:%S", # 2025/01/23 14:30:00
"%d-%m-%Y %H:%M:%S", # 23-01-2025 14:30:00
"%m/%d/%Y %H:%M:%S", # 01/23/2025 14:30:00
]
for fmt in formats:
try:
dt = datetime.strptime(datetime_str, fmt)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
continue
# 如果无法解析,返回原始字符串
return datetime_str
# ============ Messages ============
async def send_message(
self,
conversation_id: int,
content: str,
message_type: MessageType = MessageType.OUTGOING,
private: bool = False
) -> dict[str, Any]:
"""Send a message to a conversation
Args:
conversation_id: Conversation ID
content: Message content
message_type: Message type (incoming/outgoing)
private: Whether message is private (internal note)
Returns:
Created message data
"""
client = await self._get_client()
payload = {
"content": content,
"message_type": message_type.value,
"private": private
}
response = await client.post(
f"/conversations/{conversation_id}/messages",
json=payload
)
response.raise_for_status()
data = response.json()
logger.debug(
"Message sent",
conversation_id=conversation_id,
message_id=data.get("id")
)
return data
async def send_rich_message(
self,
conversation_id: int,
content: str,
content_type: str,
content_attributes: dict[str, Any]
) -> dict[str, Any]:
"""Send a rich message (cards, buttons, etc.)
Args:
conversation_id: Conversation ID
content: Fallback text content
content_type: Rich content type (cards, input_select, etc.)
content_attributes: Rich content attributes
Returns:
Created message data
"""
client = await self._get_client()
payload = {
"content": content,
"message_type": MessageType.OUTGOING.value,
"content_type": content_type,
"content_attributes": content_attributes
}
response = await client.post(
f"/conversations/{conversation_id}/messages",
json=payload
)
response.raise_for_status()
return response.json()
async def send_order_card(
self,
conversation_id: int,
order_data: dict[str, Any],
actions: Optional[list[dict[str, Any]]] = None
) -> dict[str, Any]:
"""发送订单卡片消息(使用 Chatwoot cards 格式)
卡片结构:
- 图片:第一件商品的图片
- 标题:订单号 + 状态
- 描述:汇总信息
- 按钮:跳转到订单详情页面
Args:
conversation_id: 会话 ID
order_data: 订单数据,包含:
- order_id: 订单号
- status: 订单状态
- status_text: 状态文本
- items: 商品列表
- total_amount: 总金额
actions: 操作按钮配置列表(可选)
Returns:
发送结果
Example:
>>> order_data = {
... "order_id": "202071324",
... "status_text": "已发货",
... "items": [{"name": "商品A", "quantity": 2}],
... "total_amount": "599.00"
... }
>>> await chatwoot.send_order_card(123, order_data)
"""
order_id = order_data.get("order_id", "")
status_text = order_data.get("status_text", order_data.get("status", ""))
# 获取第一件商品的图片
items = order_data.get("items", [])
media_url = None
if items and len(items) > 0:
first_item = items[0]
media_url = first_item.get("image_url")
# 构建标题
title = f"订单 #{order_id} {status_text}"
# 构建描述
if items and len(items) > 0:
if len(items) == 1:
items_desc = items[0].get("name", "商品")
else:
items_desc = f"{items[0].get('name', '商品A')} 等共计 {len(items)} 件商品"
description = f"包含 {items_desc},实付 ¥{order_data.get('total_amount', '0.00')}"
else:
description = f"实付 ¥{order_data.get('total_amount', '0.00')}"
# 构建操作按钮
card_actions = []
if actions:
card_actions = actions
else:
# 默认按钮:跳转到订单详情页面
card_actions = [
{
"type": "link",
"text": "查看订单详情",
"uri": f"https://www.qa1.gaia888.com/customer/order/detail?orderId={order_id}"
}
]
# 构建单个卡片
card = {
"title": title,
"description": description,
"actions": card_actions
}
# 如果有图片,添加 media_url
if media_url:
card["media_url"] = media_url
# 构建 content_attributes
content_attributes = {
"items": [card]
}
# 记录发送的数据(用于调试)
logger.info(
"Sending order card",
conversation_id=conversation_id,
order_id=order_id,
has_media=bool(media_url),
payload_preview=json.dumps({
"content": "订单详情",
"content_type": "cards",
"content_attributes": content_attributes
}, ensure_ascii=False, indent=2)[:1000]
)
# 发送富媒体消息
return await self.send_rich_message(
conversation_id=conversation_id,
content="订单详情",
content_type="cards",
content_attributes=content_attributes
)
async def send_order_form(
self,
conversation_id: int,
order_data: dict[str, Any],
language: str = "en",
actions: Optional[list[dict[str, Any]]] = None
) -> dict[str, Any]:
"""发送订单详情(使用 content_type=order_detail
Args:
conversation_id: 会话 ID
order_data: 订单数据,包含:
- order_id: 订单号
- status: 订单状态码
- status_text: 状态文本
- created_at: 下单时间
- total_amount: 总金额
- items: 商品列表
language: 语言代码en, nl, de, es, fr, it, tr, zh默认 en
actions: 操作按钮配置列表(可选,暂未使用)
Returns:
发送结果
Example:
>>> order_data = {
... "order_id": "123456789",
... "status": "completed",
... "status_text": "已完成",
... "created_at": "2025-01-23 14:30:00",
... "total_amount": "179.97",
... "items": [{"name": "商品A", "image_url": "...", "quantity": 2, "price": "49.99"}]
... }
>>> await chatwoot.send_order_form(123, order_data, language="zh")
"""
order_id = order_data.get("order_id", "")
status = order_data.get("status", "")
status_text = order_data.get("status_text", "")
# 获取原始时间并格式化为 YYYY-MM-DD HH:MM:SS
raw_created_at = order_data.get("created_at", order_data.get("date_added", ""))
created_at = self._format_datetime(raw_created_at)
total_amount = order_data.get("total_amount", "0")
# 根据状态码映射状态和颜色
status_mapping = {
"0": {"status": "cancelled", "text": "已取消", "color": "text-red-600"},
"1": {"status": "pending", "text": "待支付", "color": "text-yellow-600"},
"2": {"status": "paid", "text": "已支付", "color": "text-blue-600"},
"3": {"status": "shipped", "text": "已发货", "color": "text-purple-600"},
"4": {"status": "signed", "text": "已签收", "color": "text-green-600"},
"15": {"status": "completed", "text": "已完成", "color": "text-green-600"},
"100": {"status": "cancelled", "text": "超时取消", "color": "text-red-600"},
}
status_info = status_mapping.get(str(status), {"status": "unknown", "text": status_text or "未知", "color": "text-gray-600"})
# 构建商品列表
items = order_data.get("items", [])
formatted_items = []
for item in items:
# 获取价格并格式化为字符串(包含货币符号)
price_value = item.get("price", "0")
try:
price_num = float(price_value)
price_text = f"{price_num:.2f}"
except (ValueError, TypeError):
price_text = str(price_value) if price_value else "€0.00"
formatted_items.append({
"name": item.get("name", "未知商品"),
"quantity": int(item.get("quantity", 1)),
"price": price_text,
"image": item.get("image_url", "")
})
# 计算商品总数量
total_quantity = sum(item.get("quantity", 1) for item in items)
# 计算总金额并格式化(多语言)
try:
total_amount_value = float(total_amount)
except (ValueError, TypeError):
total_amount_value = 0.0
# 构建总计标签
if language == "zh":
total_label = f"{total_quantity} 件商品"
amount_text = f"总计: €{total_amount_value:.2f}"
order_time_text = f"下单时间: {created_at}"
else:
total_label = f"Total: {total_quantity} items"
amount_text = f"Total: €{total_amount_value:.2f}"
order_time_text = f"Order time: {created_at}"
# 构建操作按钮
actions_list = []
# 获取前端 URL
frontend_url = settings.frontend_url.rstrip('/')
if language == "zh":
# 中文按钮
actions_list.append({
"text": "查看物流",
"style": "default",
"reply": f"查看订单物流信息:{order_id}"
})
actions_list.append({
"text": "订单详情",
"style": "primary",
"url": f"{frontend_url}/customer/order/detail?orderId={order_id}",
"target": "_self"
})
else:
# 英文按钮
actions_list.append({
"text": "Logistics info",
"style": "default",
"reply": f"查看订单物流信息:{order_id}"
})
actions_list.append({
"text": "Order details",
"style": "primary",
"url": f"{frontend_url}/customer/order/detail?orderId={order_id}",
"target": "_self"
})
# 构建 content_attributesorder_detail 格式)
content_attributes = {
"status": status_info["status"],
"statusText": status_info["text"],
"statusColor": status_info["color"],
"orderId": str(order_id),
"orderTime": order_time_text,
"items": formatted_items,
"showTotal": True,
"totalLabel": total_label,
"amountLabel": amount_text,
"actions": actions_list
}
# 记录发送的数据(用于调试)
logger.info(
"Sending order detail",
conversation_id=conversation_id,
order_id=order_id,
items_count=len(formatted_items),
total_quantity=total_quantity,
language=language
)
# 发送 order_detail 类型消息
client = await self._get_client()
payload = {
"content_type": "order_detail",
"content": "",
"content_attributes": content_attributes
}
response = await client.post(
f"/conversations/{conversation_id}/messages",
json=payload
)
response.raise_for_status()
return response.json()
async def send_logistics_info(
self,
conversation_id: int,
logistics_data: dict[str, Any],
language: str = "en"
) -> dict[str, Any]:
"""发送物流信息(使用 content_type=logistics
Args:
conversation_id: 会话 ID
logistics_data: 物流数据,包含:
- carrier: 物流公司名称
- tracking_number: 运单号
- status: 当前状态
- timeline: 物流轨迹列表
- order_id: 订单号(可选,用于生成链接)
language: 语言代码en, nl, de, es, fr, it, tr, zh默认 en
Returns:
发送结果
Example:
>>> logistics_data = {
... "carrier": "顺丰速运",
... "tracking_number": "SF154228901",
... "status": "派送中",
... "timeline": [...]
... }
>>> await chatwoot.send_logistics_info(123, logistics_data, language="zh")
"""
carrier = logistics_data.get("carrier", "")
tracking_number = logistics_data.get("tracking_number", "")
status = logistics_data.get("status", "")
timeline = logistics_data.get("timeline", [])
order_id = logistics_data.get("order_id", "")
# 获取最新物流信息(从 timeline 中提取 remark
latest_log = ""
latest_time = ""
current_step = 0
# Track ID 到步骤的映射
# id = 1 -> Order Received已接单
# id = 10 -> Picked Up已揽收
# id = 20 -> In Transit运输中
# id = 30 -> Delivered已送达
track_id_to_step = {
"1": 0, # Order Received
"10": 1, # Picked Up
"20": 2, # In Transit
"30": 3 # Delivered
}
if timeline and len(timeline) > 0:
# 获取第一条(最新)物流信息
first_event = timeline[0]
if isinstance(first_event, dict):
# 提取 remark 字段
remark = first_event.get("remark", "")
# 提取时间
time_str = (
first_event.get("time") or
first_event.get("date") or
first_event.get("timestamp") or
""
)
# 提取位置
location = first_event.get("location", "")
# 提取 track id
track_id = str(first_event.get("id", ""))
# 构建最新物流描述:如果 remark 为空,则只显示 location否则显示 location | remark
if location and remark:
latest_log = f"{location} | {remark}"
elif remark:
latest_log = remark
elif location:
latest_log = location
else:
latest_log = ""
# 格式化时间
if time_str:
latest_time = self._format_datetime(str(time_str))
# 根据 track id 判断当前步骤
if track_id in track_id_to_step:
current_step = track_id_to_step[track_id]
else:
# 如果无法识别 id根据时间线长度判断
current_step = len(timeline) if len(timeline) <= 4 else 2
# 构建步骤列表固定4个步骤
if language == "zh":
steps = [
{"label": "已接单"},
{"label": "已揽收"},
{"label": "运输中"},
{"label": "已送达"}
]
else:
steps = [
{"label": "Order Received"},
{"label": "Picked Up"},
{"label": "In Transit"},
{"label": "Delivered"}
]
# 构建操作按钮
actions = []
frontend_url = settings.frontend_url.rstrip('/')
if order_id:
# 如果有订单号,生成物流追踪链接
if language == "zh":
actions.append({
"text": "物流详情",
"style": "primary",
"url": f"{frontend_url}/logistic-tracking/{order_id}"
})
else:
actions.append({
"text": "Tracking Details",
"style": "primary",
"url": f"{frontend_url}/logistic-tracking/{order_id}"
})
# 构建 content_attributeslogistics 格式)
content_attributes = {
"logisticsName": carrier,
"trackingNumber": tracking_number,
"currentStep": current_step,
"isUrgent": False,
"latestLog": latest_log,
"latestTime": latest_time,
"steps": steps,
"actions": actions
}
# 记录发送的数据(用于调试)
logger.info(
"Sending logistics info",
conversation_id=conversation_id,
carrier=carrier,
tracking_number=tracking_number,
current_step=current_step,
timeline_count=len(timeline) if timeline else 0,
language=language
)
# 发送 logistics 类型消息
client = await self._get_client()
payload = {
"content_type": "logistics",
"content": "",
"content_attributes": content_attributes
}
response = await client.post(
f"/conversations/{conversation_id}/messages",
json=payload
)
response.raise_for_status()
return response.json()
async def send_order_list(
self,
conversation_id: int,
orders: list[dict[str, Any]],
language: str = "en"
) -> dict[str, Any]:
"""发送订单列表(使用自定义 content_type=order_list
每个订单包含:
- orderNumber: 订单号
- date: 订单日期
- status: 订单状态
- items: 商品列表(图片和名称)
- actions: 操作按钮
Args:
conversation_id: 会话 ID
orders: 订单列表,每个订单包含:
- order_id: 订单号
- status_text: 订单状态
- created_at: 下单时间
- items: 商品列表(可选,包含 name 和 image_url
- order_type: 订单类型(可选,用于判断渠道)
language: 语言代码en, nl, de, es, fr, it, tr, zh默认 en
Returns:
发送结果
Example:
>>> orders = [
... {
... "order_id": "20250122001",
... "created_at": "Jan 22, 2025",
... "status_text": "Shipped",
... "items": [
... {"image_url": "url1", "name": "Product 1"},
... {"image_url": "url2", "name": "Product 2"}
... ]
... }
... ]
>>> await chatwoot.send_order_list(123, orders, language="en")
"""
# 获取多语言按钮文本
if language == "zh":
details_text = "订单详情"
logistics_text = "物流信息"
details_reply_prefix = "查看订单详情:"
logistics_reply_prefix = "查看订单物流信息:"
else:
details_text = "Order details"
logistics_text = "Logistics info"
details_reply_prefix = "查看订单详情:"
logistics_reply_prefix = "查看订单物流信息:"
# 构建订单列表
order_list_data = []
for order in orders:
order_id = order.get("order_id", "")
status_text = order.get("status_text", "")
# 获取原始时间并格式化为 YYYY-MM-DD HH:MM:SS
raw_created_at = order.get("created_at", order.get("date_added", ""))
created_at = self._format_datetime(raw_created_at)
# 构建商品列表
items = order.get("items", [])
formatted_items = []
logger.info(
f"Processing order {order_id} for items",
items_count=len(items) if isinstance(items, list) else 0,
items_type=type(items).__name__,
first_item_keys=list(items[0].keys()) if items and len(items) > 0 else None
)
if items and isinstance(items, list):
for item in items:
formatted_items.append({
"image": item.get("image_url", ""),
"name": item.get("name", "Product")
})
logger.info(
f"Formatted items for order {order_id}",
formatted_items_count=len(formatted_items),
sample_items=str(formatted_items[:2]) if formatted_items else "[]"
)
# 构建操作按钮
actions = [
{
"text": details_text,
"reply": f"{details_reply_prefix}{order_id}"
},
{
"text": logistics_text,
"reply": f"{logistics_reply_prefix}{order_id}"
}
]
# 构建单个订单
order_data = {
"orderNumber": order_id,
"date": created_at,
"status": status_text,
"items": formatted_items,
"actions": actions
}
order_list_data.append(order_data)
# 构建 content_attributesorder_list 格式)
content_attributes = {
"orders": order_list_data
}
# 记录发送的数据(用于调试)
logger.info(
"Sending order list",
conversation_id=conversation_id,
orders_count=len(orders),
language=language,
payload_preview=str(content_attributes)[:1000]
)
# 发送 order_list 类型消息
client = await self._get_client()
payload = {
"content_type": "order_list",
"content": "",
"content_attributes": content_attributes
}
response = await client.post(
f"/conversations/{conversation_id}/messages",
json=payload
)
response.raise_for_status()
return response.json()
# ============ Conversations ============
async def get_conversation(self, conversation_id: int) -> dict[str, Any]:
"""Get conversation details
Args:
conversation_id: Conversation ID
Returns:
Conversation data
"""
client = await self._get_client()
response = await client.get(f"/conversations/{conversation_id}")
response.raise_for_status()
return response.json()
async def update_conversation_status(
self,
conversation_id: int,
status: ConversationStatus
) -> dict[str, Any]:
"""Update conversation status
Args:
conversation_id: Conversation ID
status: New status
Returns:
Updated conversation data
"""
client = await self._get_client()
response = await client.post(
f"/conversations/{conversation_id}/toggle_status",
json={"status": status.value}
)
response.raise_for_status()
logger.info(
"Conversation status updated",
conversation_id=conversation_id,
status=status.value
)
return response.json()
async def add_labels(
self,
conversation_id: int,
labels: list[str]
) -> dict[str, Any]:
"""Add labels to a conversation
Args:
conversation_id: Conversation ID
labels: List of label names
Returns:
Updated labels
"""
client = await self._get_client()
response = await client.post(
f"/conversations/{conversation_id}/labels",
json={"labels": labels}
)
response.raise_for_status()
return response.json()
async def assign_agent(
self,
conversation_id: int,
agent_id: int
) -> dict[str, Any]:
"""Assign an agent to a conversation
Args:
conversation_id: Conversation ID
agent_id: Agent user ID
Returns:
Assignment result
"""
client = await self._get_client()
response = await client.post(
f"/conversations/{conversation_id}/assignments",
json={"assignee_id": agent_id}
)
response.raise_for_status()
logger.info(
"Agent assigned",
conversation_id=conversation_id,
agent_id=agent_id
)
return response.json()
# ============ Contacts ============
async def get_contact(self, contact_id: int) -> dict[str, Any]:
"""Get contact details
Args:
contact_id: Contact ID
Returns:
Contact data
"""
client = await self._get_client()
response = await client.get(f"/contacts/{contact_id}")
response.raise_for_status()
return response.json()
async def update_contact(
self,
contact_id: int,
attributes: dict[str, Any]
) -> dict[str, Any]:
"""Update contact attributes
Args:
contact_id: Contact ID
attributes: Attributes to update
Returns:
Updated contact data
"""
client = await self._get_client()
response = await client.put(
f"/contacts/{contact_id}",
json=attributes
)
response.raise_for_status()
return response.json()
# ============ Messages History ============
async def get_messages(
self,
conversation_id: int,
before: Optional[int] = None
) -> list[dict[str, Any]]:
"""Get conversation messages
Args:
conversation_id: Conversation ID
before: Get messages before this message ID
Returns:
List of messages
"""
client = await self._get_client()
params = {}
if before:
params["before"] = before
response = await client.get(
f"/conversations/{conversation_id}/messages",
params=params
)
response.raise_for_status()
data = response.json()
return data.get("payload", [])
# ============ Helper Functions ============
def format_order_card_markdown(order_data: dict[str, Any]) -> str:
"""格式化订单信息为 Markdown 卡片
Args:
order_data: 订单数据,包含订单号、状态、商品、金额、物流等信息
Returns:
格式化的 Markdown 字符串
Example:
>>> order = {
... "order_id": "123456789",
... "status": "shipped",
... "status_text": "已发货",
... "created_at": "2023-10-27 14:30",
... "items": [...],
... "total_amount": "1058.00",
... "shipping_fee": "0.00",
... "logistics": {...}
... }
>>> markdown = format_order_card_markdown(order)
"""
# 订单状态 emoji 映射
status_emoji = {
"pending": "",
"paid": "💰",
"processing": "⚙️",
"shipped": "📦",
"delivered": "",
"completed": "",
"cancelled": "",
"refunded": "💸",
"failed": "⚠️",
}
# 获取状态文本和 emoji
status = order_data.get("status", "unknown")
status_text = order_data.get("status_text", status)
emoji = status_emoji.get(status, "📦")
lines = [
f"{emoji} **订单状态:{status_text}**",
f"📝 **订单号:** `{order_data.get('order_id', '')}`",
]
# 添加下单时间(如果有)
if order_data.get("created_at"):
lines.append(f"📅 **下单时间:** {order_data['created_at']}")
lines.append("") # 空行
lines.append("**商品详情**")
# 添加商品列表
items = order_data.get("items", [])
if items:
for item in items:
name = item.get("name", "未知商品")
quantity = item.get("quantity", 1)
price = item.get("price", "0.00")
# 可选:添加图片链接
image_markdown = ""
if item.get("image_url"):
image_markdown = f" [图片]({item['image_url']})"
lines.append(f"▫️{image_markdown} {name} × {quantity} ¥{price}")
else:
lines.append("▫️ 无商品信息")
# 添加金额信息
lines.extend([
"",
f"💰 **实付:** ¥{order_data.get('total_amount', '0.00')} (含运费 ¥{order_data.get('shipping_fee', '0.00')})"
])
# 添加物流信息(如果有)
logistics = order_data.get("logistics")
if logistics:
lines.extend([
"",
"🚚 **物流信息**",
f"承运商:{logistics.get('carrier', '未知')}",
f"单号:{logistics.get('tracking_number', '未知')}",
"*点击单号可复制跟踪*"
])
# 添加备注(如果有)
if order_data.get("remark"):
lines.extend([
"",
f"📋 **备注:** {order_data['remark']}"
])
return "\n".join(lines)
def create_action_buttons(actions: list[dict[str, Any]]) -> dict[str, Any]:
"""创建 Chatwoot 操作按钮卡片
Args:
actions: 按钮配置列表,每个按钮包含:
- type: "link""postback"
- text: 按钮文字
- uri: 链接地址type=link 时)
- payload: 回传数据type=postback 时)
Returns:
符合 Chatwoot content_attributes 格式的字典
Example:
>>> actions = [
... {"type": "link", "text": "查看详情", "uri": "https://example.com"},
... {"type": "postback", "text": "联系客服", "payload": "CONTACT_SUPPORT"}
... ]
>>> buttons = create_action_buttons(actions)
"""
return {
"items": [{
"title": "操作",
"actions": actions
}]
}
# Global Chatwoot client instance
chatwoot_client: Optional[ChatwootClient] = None
def get_chatwoot_client(account_id: Optional[int] = None) -> ChatwootClient:
"""Get or create Chatwoot client instance
Args:
account_id: Chatwoot account ID. If not provided, uses settings default.
"""
global chatwoot_client
if account_id is not None:
# 创建指定 account_id 的客户端实例
return ChatwootClient(account_id=account_id)
if chatwoot_client is None:
chatwoot_client = ChatwootClient()
return chatwoot_client