## 主要修复 ### 1. JSON 解析错误处理 - 修复所有 Agent 的 LLM 响应解析失败时返回原始内容的问题 - 当 JSON 解析失败时,返回友好的兜底消息而不是原始文本 - 影响文件: customer_service.py, order.py, product.py, aftersale.py ### 2. FAQ 快速路径修复 - 修复 customer_service.py 中变量定义顺序问题 - has_faq_query 在使用前未定义导致 NameError - 添加详细的错误日志记录 ### 3. Chatwoot 集成改进 - 添加响应内容调试日志 - 改进错误处理和日志记录 ### 4. 订单查询优化 - 将订单列表默认返回数量从 10 条改为 5 条 - 统一 MCP 工具层和 Mall Client 层的默认值 ### 5. 代码清理 - 删除所有测试代码和示例文件 - 刋试文件包括: test_*.py, test_*.html, test_*.sh - 删除测试目录: tests/, agent/tests/, agent/examples/ Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1689 lines
54 KiB
Python
1689 lines
54 KiB
Python
"""
|
||
Chatwoot API Client for B2B Shopping AI Assistant
|
||
"""
|
||
import json
|
||
from typing import Any, Optional
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
from contextlib import asynccontextmanager
|
||
|
||
import httpx
|
||
|
||
from config import settings
|
||
from utils.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
# 订单字段多语言映射
|
||
ORDER_FIELD_LABELS = {
|
||
"zh": { # 中文
|
||
"title": "订单详情",
|
||
"order_id": "订单号",
|
||
"status": "订单状态",
|
||
"total_amount": "订单金额",
|
||
"payment_method": "支付方式",
|
||
"items_count": "商品总数量",
|
||
"shipping_method": "运输方式",
|
||
"tracking_number": "运单号",
|
||
"shipping_address": "收货地址",
|
||
"billing_address": "发票地址",
|
||
"channel": "渠道",
|
||
"created_at": "下单时间",
|
||
"action": "操作",
|
||
},
|
||
"en": { # English
|
||
"title": "Order Details",
|
||
"order_id": "Order ID",
|
||
"status": "Order Status",
|
||
"total_amount": "Total Amount",
|
||
"payment_method": "Payment Method",
|
||
"items_count": "Total Items",
|
||
"shipping_method": "Shipping Method",
|
||
"tracking_number": "Tracking Number",
|
||
"shipping_address": "Shipping Address",
|
||
"billing_address": "Billing Address",
|
||
"channel": "Channel",
|
||
"created_at": "Order Date",
|
||
"action": "Action",
|
||
},
|
||
"nl": { # Dutch (荷兰语)
|
||
"title": "Orderdetails",
|
||
"order_id": "Ordernummer",
|
||
"status": "Orderstatus",
|
||
"total_amount": "Totaalbedrag",
|
||
"payment_method": "Betaalmethode",
|
||
"items_count": "Totaal aantal artikelen",
|
||
"shipping_method": "Verzendmethode",
|
||
"tracking_number": "Trackingsnummer",
|
||
"shipping_address": "Verzendadres",
|
||
"billing_address": "Factuuradres",
|
||
"channel": "Kanaal",
|
||
"created_at": "Besteldatum",
|
||
"action": "Actie",
|
||
},
|
||
"de": { # German (德语)
|
||
"title": "Bestelldetails",
|
||
"order_id": "Bestellnummer",
|
||
"status": "Bestellstatus",
|
||
"total_amount": "Gesamtbetrag",
|
||
"payment_method": "Zahlungsmethode",
|
||
"items_count": "Gesamtanzahl Artikel",
|
||
"shipping_method": "Versandart",
|
||
"tracking_number": "Sendungsnummer",
|
||
"shipping_address": "Lieferadresse",
|
||
"billing_address": "Rechnungsadresse",
|
||
"channel": "Kanal",
|
||
"created_at": "Bestelldatum",
|
||
"action": "Aktion",
|
||
},
|
||
"es": { # Spanish (西班牙语)
|
||
"title": "Detalles del pedido",
|
||
"order_id": "Número de pedido",
|
||
"status": "Estado del pedido",
|
||
"total_amount": "Importe total",
|
||
"payment_method": "Método de pago",
|
||
"items_count": "Total de artículos",
|
||
"shipping_method": "Método de envío",
|
||
"tracking_number": "Número de seguimiento",
|
||
"shipping_address": "Dirección de envío",
|
||
"billing_address": "Dirección de facturación",
|
||
"channel": "Canal",
|
||
"created_at": "Fecha del pedido",
|
||
"action": "Acción",
|
||
},
|
||
"fr": { # French (法语)
|
||
"title": "Détails de la commande",
|
||
"order_id": "Numéro de commande",
|
||
"status": "Statut de la commande",
|
||
"total_amount": "Montant total",
|
||
"payment_method": "Méthode de paiement",
|
||
"items_count": "Nombre total d'articles",
|
||
"shipping_method": "Méthode d'expédition",
|
||
"tracking_number": "Numéro de suivi",
|
||
"shipping_address": "Adresse de livraison",
|
||
"billing_address": "Adresse de facturation",
|
||
"channel": "Canal",
|
||
"created_at": "Date de commande",
|
||
"action": "Action",
|
||
},
|
||
"it": { # Italian (意大利语)
|
||
"title": "Detagli dell'ordine",
|
||
"order_id": "Numero ordine",
|
||
"status": "Stato ordine",
|
||
"total_amount": "Importo totale",
|
||
"payment_method": "Metodo di pagamento",
|
||
"items_count": "Totale articoli",
|
||
"shipping_method": "Metodo di spedizione",
|
||
"tracking_number": "Numero di tracciamento",
|
||
"shipping_address": "Indirizzo di spedizione",
|
||
"billing_address": "Indirizzo di fatturazione",
|
||
"channel": "Canale",
|
||
"created_at": "Data ordine",
|
||
"action": "Azione",
|
||
},
|
||
"tr": { # Turkish (土耳其语)
|
||
"title": "Sipariş Detayları",
|
||
"order_id": "Sipariş Numarası",
|
||
"status": "Sipariş Durumu",
|
||
"total_amount": "Toplam Tutar",
|
||
"payment_method": "Ödeme Yöntemi",
|
||
"items_count": "Toplam Ürün",
|
||
"shipping_method": "Kargo Yöntemi",
|
||
"tracking_number": "Takip Numarası",
|
||
"shipping_address": "Teslimat Adresi",
|
||
"billing_address": "Fatura Adresi",
|
||
"channel": "Kanal",
|
||
"created_at": "Sipariş Tarihi",
|
||
"action": "İşlem",
|
||
},
|
||
}
|
||
|
||
|
||
def get_field_label(field_key: str, language: str = "en") -> str:
|
||
"""获取指定语言的字段标签
|
||
|
||
Args:
|
||
field_key: 字段键名(如 "order_id", "status" 等)
|
||
language: 语言代码(默认 "en")
|
||
|
||
Returns:
|
||
对应语言的字段标签
|
||
"""
|
||
if language not in ORDER_FIELD_LABELS:
|
||
language = "en" # 默认使用英文
|
||
return ORDER_FIELD_LABELS[language].get(field_key, ORDER_FIELD_LABELS["en"].get(field_key, field_key))
|
||
|
||
|
||
# 订单状态多语言映射
|
||
ORDER_STATUS_LABELS = {
|
||
"zh": { # 中文
|
||
"0": "已取消",
|
||
"1": "待支付",
|
||
"2": "已支付",
|
||
"3": "已发货",
|
||
"4": "已签收",
|
||
"15": "已完成",
|
||
"100": "超时取消",
|
||
"unknown": "未知"
|
||
},
|
||
"en": { # English
|
||
"0": "Cancelled",
|
||
"1": "Pending Payment",
|
||
"2": "Paid",
|
||
"3": "Shipped",
|
||
"4": "Delivered",
|
||
"15": "Completed",
|
||
"100": "Timeout Cancelled",
|
||
"unknown": "Unknown"
|
||
},
|
||
"nl": { # Dutch (荷兰语)
|
||
"0": "Geannuleerd",
|
||
"1": "Wachtend op betaling",
|
||
"2": "Betaald",
|
||
"3": "Verzonden",
|
||
"4": "Geleverd",
|
||
"15": "Voltooid",
|
||
"100": "Time-out geannuleerd",
|
||
"unknown": "Onbekend"
|
||
},
|
||
"de": { # German (德语)
|
||
"0": "Storniert",
|
||
"1": "Zahlung ausstehend",
|
||
"2": "Bezahlt",
|
||
"3": "Versandt",
|
||
"4": "Zugestellt",
|
||
"15": "Abgeschlossen",
|
||
"100": "Zeitüberschreitung storniert",
|
||
"unknown": "Unbekannt"
|
||
},
|
||
"es": { # Spanish (西班牙语)
|
||
"0": "Cancelado",
|
||
"1": "Pago pendiente",
|
||
"2": "Pagado",
|
||
"3": "Enviado",
|
||
"4": "Entregado",
|
||
"15": "Completado",
|
||
"100": "Cancelado por tiempo límite",
|
||
"unknown": "Desconocido"
|
||
},
|
||
"fr": { # French (法语)
|
||
"0": "Annulé",
|
||
"1": "En attente de paiement",
|
||
"2": "Payé",
|
||
"3": "Expédié",
|
||
"4": "Livré",
|
||
"15": "Terminé",
|
||
"100": "Annulé pour expiration",
|
||
"unknown": "Inconnu"
|
||
},
|
||
"it": { # Italian (意大利语)
|
||
"0": "Annullato",
|
||
"1": "In attesa di pagamento",
|
||
"2": "Pagato",
|
||
"3": "Spedito",
|
||
"4": "Consegnato",
|
||
"15": "Completato",
|
||
"100": "Annullato per timeout",
|
||
"unknown": "Sconosciuto"
|
||
},
|
||
"tr": { # Turkish (土耳其语)
|
||
"0": "İptal edildi",
|
||
"1": "Ödeme bekleniyor",
|
||
"2": "Ödendi",
|
||
"3": "Kargolandı",
|
||
"4": "Teslim edildi",
|
||
"15": "Tamamlandı",
|
||
"100": "Zaman aşımı iptal edildi",
|
||
"unknown": "Bilinmiyor"
|
||
}
|
||
}
|
||
|
||
|
||
def get_status_label(status_code: str, language: str = "en") -> str:
|
||
"""获取指定语言的订单状态标签
|
||
|
||
Args:
|
||
status_code: 状态码(如 "0", "1", "2" 等)
|
||
language: 语言代码(默认 "en")
|
||
|
||
Returns:
|
||
对应语言的状态标签
|
||
"""
|
||
if language not in ORDER_STATUS_LABELS:
|
||
language = "en" # 默认使用英文
|
||
return ORDER_STATUS_LABELS[language].get(
|
||
str(status_code),
|
||
ORDER_STATUS_LABELS["en"].get(str(status_code), ORDER_STATUS_LABELS["en"]["unknown"])
|
||
)
|
||
|
||
|
||
class MessageType(str, Enum):
|
||
"""Chatwoot message types"""
|
||
INCOMING = "incoming"
|
||
OUTGOING = "outgoing"
|
||
|
||
|
||
class ConversationStatus(str, Enum):
|
||
"""Chatwoot conversation status"""
|
||
OPEN = "open"
|
||
RESOLVED = "resolved"
|
||
PENDING = "pending"
|
||
SNOOZED = "snoozed"
|
||
|
||
|
||
@dataclass
|
||
class ChatwootMessage:
|
||
"""Chatwoot message structure"""
|
||
id: int
|
||
content: str
|
||
message_type: str
|
||
conversation_id: int
|
||
sender_type: Optional[str] = None
|
||
sender_id: Optional[int] = None
|
||
private: bool = False
|
||
|
||
|
||
@dataclass
|
||
class ChatwootContact:
|
||
"""Chatwoot contact structure"""
|
||
id: int
|
||
name: Optional[str] = None
|
||
email: Optional[str] = None
|
||
phone_number: Optional[str] = None
|
||
custom_attributes: Optional[dict[str, Any]] = None
|
||
|
||
|
||
class ChatwootClient:
|
||
"""Chatwoot API Client"""
|
||
|
||
def __init__(
|
||
self,
|
||
api_url: Optional[str] = None,
|
||
api_token: Optional[str] = None,
|
||
account_id: int = 2
|
||
):
|
||
"""Initialize Chatwoot client
|
||
|
||
Args:
|
||
api_url: Chatwoot API URL, defaults to settings
|
||
api_token: API access token, defaults to settings
|
||
account_id: Chatwoot account ID
|
||
"""
|
||
self.api_url = (api_url or settings.chatwoot_api_url).rstrip("/")
|
||
self.api_token = api_token or settings.chatwoot_api_token
|
||
self.account_id = account_id
|
||
self._client: Optional[httpx.AsyncClient] = None
|
||
|
||
logger.info("Chatwoot client initialized", api_url=self.api_url)
|
||
|
||
async def _get_client(self) -> httpx.AsyncClient:
|
||
"""Get or create HTTP client"""
|
||
if self._client is None:
|
||
self._client = httpx.AsyncClient(
|
||
base_url=f"{self.api_url}/api/v1/accounts/{self.account_id}",
|
||
headers={
|
||
"api_access_token": self.api_token,
|
||
"Content-Type": "application/json"
|
||
},
|
||
timeout=30.0
|
||
)
|
||
return self._client
|
||
|
||
async def close(self) -> None:
|
||
"""Close HTTP client"""
|
||
if self._client:
|
||
await self._client.aclose()
|
||
self._client = None
|
||
|
||
def _format_datetime(self, datetime_str: str) -> str:
|
||
"""格式化日期时间为 YYYY-MM-DD HH:MM:SS 格式
|
||
|
||
Args:
|
||
datetime_str: 输入的日期时间字符串
|
||
|
||
Returns:
|
||
格式化后的日期时间字符串 (YYYY-MM-DD HH:MM:SS)
|
||
"""
|
||
if not datetime_str:
|
||
return ""
|
||
|
||
# 如果已经是正确的格式(YYYY-MM-DD HH:MM:SS),直接返回
|
||
if len(datetime_str) == 19 and datetime_str[10] == ' ' and datetime_str[13] == ':' and datetime_str[16] == ':':
|
||
return datetime_str
|
||
|
||
# 尝试解析常见的时间格式
|
||
from datetime import datetime
|
||
|
||
# 常见格式列表
|
||
formats = [
|
||
"%Y-%m-%d %H:%M:%S", # 2025-01-23 14:30:00
|
||
"%Y-%m-%dT%H:%M:%S", # 2025-01-23T14:30:00
|
||
"%Y-%m-%d %H:%M:%S.%f", # 2025-01-23 14:30:00.123456
|
||
"%Y-%m-%dT%H:%M:%S.%f", # 2025-01-23T14:30:00.123456
|
||
"%Y-%m-%dT%H:%M:%SZ", # 2025-01-23T14:30:00Z
|
||
"%Y-%m-%dT%H:%M:%S.%fZ", # 2025-01-23T14:30:00.123456Z
|
||
"%Y/%m/%d %H:%M:%S", # 2025/01/23 14:30:00
|
||
"%d-%m-%Y %H:%M:%S", # 23-01-2025 14:30:00
|
||
"%m/%d/%Y %H:%M:%S", # 01/23/2025 14:30:00
|
||
]
|
||
|
||
for fmt in formats:
|
||
try:
|
||
dt = datetime.strptime(datetime_str, fmt)
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
except ValueError:
|
||
continue
|
||
|
||
# 如果无法解析,返回原始字符串
|
||
return datetime_str
|
||
|
||
# ============ Messages ============
|
||
|
||
async def send_message(
|
||
self,
|
||
conversation_id: int,
|
||
content: str,
|
||
message_type: MessageType = MessageType.OUTGOING,
|
||
private: bool = False
|
||
) -> dict[str, Any]:
|
||
"""Send a message to a conversation
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
content: Message content
|
||
message_type: Message type (incoming/outgoing)
|
||
private: Whether message is private (internal note)
|
||
|
||
Returns:
|
||
Created message data
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
payload = {
|
||
"content": content,
|
||
"message_type": message_type.value,
|
||
"private": private
|
||
}
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/messages",
|
||
json=payload
|
||
)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
logger.debug(
|
||
"Message sent",
|
||
conversation_id=conversation_id,
|
||
message_id=data.get("id")
|
||
)
|
||
return data
|
||
|
||
async def send_rich_message(
|
||
self,
|
||
conversation_id: int,
|
||
content: str,
|
||
content_type: str,
|
||
content_attributes: dict[str, Any]
|
||
) -> dict[str, Any]:
|
||
"""Send a rich message (cards, buttons, etc.)
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
content: Fallback text content
|
||
content_type: Rich content type (cards, input_select, etc.)
|
||
content_attributes: Rich content attributes
|
||
|
||
Returns:
|
||
Created message data
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
payload = {
|
||
"content": content,
|
||
"message_type": MessageType.OUTGOING.value,
|
||
"content_type": content_type,
|
||
"content_attributes": content_attributes
|
||
}
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/messages",
|
||
json=payload
|
||
)
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
async def send_order_card(
|
||
self,
|
||
conversation_id: int,
|
||
order_data: dict[str, Any],
|
||
actions: Optional[list[dict[str, Any]]] = None
|
||
) -> dict[str, Any]:
|
||
"""发送订单卡片消息(使用 Chatwoot cards 格式)
|
||
|
||
卡片结构:
|
||
- 图片:第一件商品的图片
|
||
- 标题:订单号 + 状态
|
||
- 描述:汇总信息
|
||
- 按钮:跳转到订单详情页面
|
||
|
||
Args:
|
||
conversation_id: 会话 ID
|
||
order_data: 订单数据,包含:
|
||
- order_id: 订单号
|
||
- status: 订单状态
|
||
- status_text: 状态文本
|
||
- items: 商品列表
|
||
- total_amount: 总金额
|
||
actions: 操作按钮配置列表(可选)
|
||
|
||
Returns:
|
||
发送结果
|
||
|
||
Example:
|
||
>>> order_data = {
|
||
... "order_id": "202071324",
|
||
... "status_text": "已发货",
|
||
... "items": [{"name": "商品A", "quantity": 2}],
|
||
... "total_amount": "599.00"
|
||
... }
|
||
>>> await chatwoot.send_order_card(123, order_data)
|
||
"""
|
||
order_id = order_data.get("order_id", "")
|
||
status_text = order_data.get("status_text", order_data.get("status", ""))
|
||
|
||
# 获取第一件商品的图片
|
||
items = order_data.get("items", [])
|
||
media_url = None
|
||
if items and len(items) > 0:
|
||
first_item = items[0]
|
||
media_url = first_item.get("image_url")
|
||
|
||
# 构建标题
|
||
title = f"订单 #{order_id} {status_text}"
|
||
|
||
# 构建描述
|
||
if items and len(items) > 0:
|
||
if len(items) == 1:
|
||
items_desc = items[0].get("name", "商品")
|
||
else:
|
||
items_desc = f"{items[0].get('name', '商品A')} 等共计 {len(items)} 件商品"
|
||
description = f"包含 {items_desc},实付 ¥{order_data.get('total_amount', '0.00')}"
|
||
else:
|
||
description = f"实付 ¥{order_data.get('total_amount', '0.00')}"
|
||
|
||
# 构建操作按钮
|
||
card_actions = []
|
||
if actions:
|
||
card_actions = actions
|
||
else:
|
||
# 默认按钮:跳转到订单详情页面
|
||
card_actions = [
|
||
{
|
||
"type": "link",
|
||
"text": "查看订单详情",
|
||
"uri": f"https://www.qa1.gaia888.com/customer/order/detail?orderId={order_id}"
|
||
}
|
||
]
|
||
|
||
# 构建单个卡片
|
||
card = {
|
||
"title": title,
|
||
"description": description,
|
||
"actions": card_actions
|
||
}
|
||
|
||
# 如果有图片,添加 media_url
|
||
if media_url:
|
||
card["media_url"] = media_url
|
||
|
||
# 构建 content_attributes
|
||
content_attributes = {
|
||
"items": [card]
|
||
}
|
||
|
||
# 记录发送的数据(用于调试)
|
||
logger.info(
|
||
"Sending order card",
|
||
conversation_id=conversation_id,
|
||
order_id=order_id,
|
||
has_media=bool(media_url),
|
||
payload_preview=json.dumps({
|
||
"content": "订单详情",
|
||
"content_type": "cards",
|
||
"content_attributes": content_attributes
|
||
}, ensure_ascii=False, indent=2)[:1000]
|
||
)
|
||
|
||
# 发送富媒体消息
|
||
return await self.send_rich_message(
|
||
conversation_id=conversation_id,
|
||
content="订单详情",
|
||
content_type="cards",
|
||
content_attributes=content_attributes
|
||
)
|
||
|
||
async def send_order_form(
|
||
self,
|
||
conversation_id: int,
|
||
order_data: dict[str, Any],
|
||
language: str = "en",
|
||
actions: Optional[list[dict[str, Any]]] = None
|
||
) -> dict[str, Any]:
|
||
"""发送订单详情(使用 content_type=order_detail)
|
||
|
||
Args:
|
||
conversation_id: 会话 ID
|
||
order_data: 订单数据,包含:
|
||
- order_id: 订单号
|
||
- status: 订单状态码
|
||
- status_text: 状态文本
|
||
- created_at: 下单时间
|
||
- total_amount: 总金额
|
||
- items: 商品列表
|
||
language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en
|
||
actions: 操作按钮配置列表(可选,暂未使用)
|
||
|
||
Returns:
|
||
发送结果
|
||
|
||
Example:
|
||
>>> order_data = {
|
||
... "order_id": "123456789",
|
||
... "status": "completed",
|
||
... "status_text": "已完成",
|
||
... "created_at": "2025-01-23 14:30:00",
|
||
... "total_amount": "179.97",
|
||
... "items": [{"name": "商品A", "image_url": "...", "quantity": 2, "price": "49.99"}]
|
||
... }
|
||
>>> await chatwoot.send_order_form(123, order_data, language="zh")
|
||
"""
|
||
order_id = order_data.get("order_id", "")
|
||
status = order_data.get("status", "")
|
||
status_text = order_data.get("status_text", "")
|
||
|
||
# 获取原始时间并格式化为 YYYY-MM-DD HH:MM:SS
|
||
raw_created_at = order_data.get("created_at", order_data.get("date_added", ""))
|
||
created_at = self._format_datetime(raw_created_at)
|
||
|
||
total_amount = order_data.get("total_amount", "0")
|
||
|
||
# 根据状态码映射状态和颜色(支持多语言)
|
||
status_code_to_key = {
|
||
"0": {"key": "cancelled", "color": "text-red-600"},
|
||
"1": {"key": "pending", "color": "text-yellow-600"},
|
||
"2": {"key": "paid", "color": "text-blue-600"},
|
||
"3": {"key": "shipped", "color": "text-purple-600"},
|
||
"4": {"key": "signed", "color": "text-green-600"},
|
||
"15": {"key": "completed", "color": "text-green-600"},
|
||
"100": {"key": "cancelled", "color": "text-red-600"},
|
||
}
|
||
|
||
status_key_info = status_code_to_key.get(str(status), {"key": "unknown", "color": "text-gray-600"})
|
||
status_label = get_status_label(str(status), language)
|
||
|
||
status_info = {
|
||
"status": status_key_info["key"],
|
||
"text": status_label,
|
||
"color": status_key_info["color"]
|
||
}
|
||
|
||
# 构建商品列表
|
||
items = order_data.get("items", [])
|
||
formatted_items = []
|
||
for item in items:
|
||
# 获取价格并格式化为字符串(包含货币符号)
|
||
price_value = item.get("price", "0")
|
||
try:
|
||
price_num = float(price_value)
|
||
price_text = f"€{price_num:.2f}"
|
||
except (ValueError, TypeError):
|
||
price_text = str(price_value) if price_value else "€0.00"
|
||
|
||
formatted_items.append({
|
||
"name": item.get("name", "未知商品"),
|
||
"quantity": int(item.get("quantity", 1)),
|
||
"price": price_text,
|
||
"image": item.get("image_url", "")
|
||
})
|
||
|
||
# 计算商品总数量
|
||
total_quantity = sum(item.get("quantity", 1) for item in items)
|
||
|
||
# 计算总金额并格式化(多语言)
|
||
try:
|
||
total_amount_value = float(total_amount)
|
||
except (ValueError, TypeError):
|
||
total_amount_value = 0.0
|
||
|
||
# 构建总计标签
|
||
if language == "zh":
|
||
total_label = f"共 {total_quantity} 件商品"
|
||
amount_text = f"总计: €{total_amount_value:.2f}"
|
||
order_time_text = f"下单时间: {created_at}"
|
||
else:
|
||
total_label = f"Total: {total_quantity} items"
|
||
amount_text = f"Total: €{total_amount_value:.2f}"
|
||
order_time_text = f"Order time: {created_at}"
|
||
|
||
# 构建操作按钮
|
||
actions_list = []
|
||
# 获取前端 URL
|
||
frontend_url = settings.frontend_url.rstrip('/')
|
||
|
||
if language == "zh":
|
||
# 中文按钮
|
||
actions_list.append({
|
||
"text": "查看物流",
|
||
"style": "default",
|
||
"reply": f"查看订单物流信息:{order_id}"
|
||
})
|
||
actions_list.append({
|
||
"text": "订单详情",
|
||
"style": "primary",
|
||
"url": f"{frontend_url}/customer/order/detail?orderId={order_id}",
|
||
"target": "_self"
|
||
})
|
||
else:
|
||
# 英文按钮
|
||
actions_list.append({
|
||
"text": "Logistics info",
|
||
"style": "default",
|
||
"reply": f"查看订单物流信息:{order_id}"
|
||
})
|
||
actions_list.append({
|
||
"text": "Order details",
|
||
"style": "primary",
|
||
"url": f"{frontend_url}/customer/order/detail?orderId={order_id}",
|
||
"target": "_self"
|
||
})
|
||
|
||
# 构建 content_attributes(order_detail 格式)
|
||
content_attributes = {
|
||
"status": status_info["status"],
|
||
"statusText": status_info["text"],
|
||
"statusColor": status_info["color"],
|
||
"orderId": str(order_id),
|
||
"orderTime": order_time_text,
|
||
"items": formatted_items,
|
||
"showTotal": True,
|
||
"totalLabel": total_label,
|
||
"amountLabel": amount_text,
|
||
"actions": actions_list
|
||
}
|
||
|
||
# 记录发送的数据(用于调试)
|
||
logger.info(
|
||
"Sending order detail",
|
||
conversation_id=conversation_id,
|
||
order_id=order_id,
|
||
items_count=len(formatted_items),
|
||
total_quantity=total_quantity,
|
||
language=language
|
||
)
|
||
|
||
# 发送 order_detail 类型消息
|
||
client = await self._get_client()
|
||
|
||
payload = {
|
||
"content_type": "order_detail",
|
||
"content": "",
|
||
"content_attributes": content_attributes
|
||
}
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/messages",
|
||
json=payload
|
||
)
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
async def send_logistics_info(
|
||
self,
|
||
conversation_id: int,
|
||
logistics_data: dict[str, Any],
|
||
language: str = "en"
|
||
) -> dict[str, Any]:
|
||
"""发送物流信息(使用 content_type=logistics)
|
||
|
||
Args:
|
||
conversation_id: 会话 ID
|
||
logistics_data: 物流数据,包含:
|
||
- carrier: 物流公司名称
|
||
- tracking_number: 运单号
|
||
- status: 当前状态
|
||
- timeline: 物流轨迹列表
|
||
- order_id: 订单号(可选,用于生成链接)
|
||
- tracking_url: 官方追踪链接(可选,跳转到物流公司官网)
|
||
language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en
|
||
|
||
Returns:
|
||
发送结果
|
||
|
||
Example:
|
||
>>> logistics_data = {
|
||
... "carrier": "顺丰速运",
|
||
... "tracking_number": "SF154228901",
|
||
... "status": "派送中",
|
||
... "timeline": [...]
|
||
... }
|
||
>>> await chatwoot.send_logistics_info(123, logistics_data, language="zh")
|
||
"""
|
||
carrier = logistics_data.get("carrier", "")
|
||
tracking_number = logistics_data.get("tracking_number", "")
|
||
status = logistics_data.get("status", "")
|
||
timeline = logistics_data.get("timeline", [])
|
||
order_id = logistics_data.get("order_id", "")
|
||
tracking_url = logistics_data.get("tracking_url", "") # 添加追踪链接
|
||
|
||
# 获取最新物流信息(从 timeline 中提取 remark)
|
||
latest_log = ""
|
||
latest_time = ""
|
||
current_step = 0
|
||
|
||
# Track ID 到步骤的映射
|
||
# id = 1 -> Order Received(已接单)
|
||
# id = 10 -> Picked Up(已揽收)
|
||
# id = 20 -> In Transit(运输中)
|
||
# id = 30 -> Delivered(已送达)
|
||
track_id_to_step = {
|
||
"1": 0, # Order Received
|
||
"10": 1, # Picked Up
|
||
"20": 2, # In Transit
|
||
"30": 3 # Delivered
|
||
}
|
||
|
||
if timeline and len(timeline) > 0:
|
||
# 获取第一条(最新)物流信息
|
||
first_event = timeline[0]
|
||
if isinstance(first_event, dict):
|
||
# 提取 remark 字段
|
||
remark = first_event.get("remark", "")
|
||
|
||
# 提取时间
|
||
time_str = (
|
||
first_event.get("time") or
|
||
first_event.get("date") or
|
||
first_event.get("timestamp") or
|
||
""
|
||
)
|
||
|
||
# 提取位置
|
||
location = first_event.get("location", "")
|
||
|
||
# 提取 track id
|
||
track_id = str(first_event.get("id", ""))
|
||
|
||
# 构建最新物流描述:如果 remark 为空,则只显示 location;否则显示 location | remark
|
||
if location and remark:
|
||
latest_log = f"{location} | {remark}"
|
||
elif remark:
|
||
latest_log = remark
|
||
elif location:
|
||
latest_log = location
|
||
else:
|
||
latest_log = ""
|
||
|
||
# 格式化时间
|
||
if time_str:
|
||
latest_time = self._format_datetime(str(time_str))
|
||
|
||
# 根据 track id 判断当前步骤
|
||
if track_id in track_id_to_step:
|
||
current_step = track_id_to_step[track_id]
|
||
else:
|
||
# 如果无法识别 id,根据时间线长度判断
|
||
current_step = len(timeline) if len(timeline) <= 4 else 2
|
||
|
||
# 构建步骤列表(固定4个步骤)
|
||
if language == "zh":
|
||
steps = [
|
||
{"label": "已接单"},
|
||
{"label": "已揽收"},
|
||
{"label": "运输中"},
|
||
{"label": "已送达"}
|
||
]
|
||
else:
|
||
steps = [
|
||
{"label": "Order Received"},
|
||
{"label": "Picked Up"},
|
||
{"label": "In Transit"},
|
||
{"label": "Delivered"}
|
||
]
|
||
|
||
# 构建操作按钮
|
||
actions = []
|
||
frontend_url = settings.frontend_url.rstrip('/')
|
||
|
||
if order_id:
|
||
# 如果有订单号,生成物流追踪链接
|
||
if language == "zh":
|
||
actions.append({
|
||
"text": "物流详情",
|
||
"style": "primary",
|
||
"url": f"{frontend_url}/logistic-tracking/{order_id}"
|
||
})
|
||
else:
|
||
actions.append({
|
||
"text": "Tracking Details",
|
||
"style": "primary",
|
||
"url": f"{frontend_url}/logistic-tracking/{order_id}"
|
||
})
|
||
|
||
# 如果有 tracking_url,添加官方追踪按钮(在新标签页打开)
|
||
if tracking_url:
|
||
if language == "zh":
|
||
actions.append({
|
||
"text": "官网追踪",
|
||
"style": "default",
|
||
"url": tracking_url,
|
||
"target": "_blank"
|
||
})
|
||
else:
|
||
actions.append({
|
||
"text": "Official Tracking",
|
||
"style": "default",
|
||
"url": tracking_url,
|
||
"target": "_blank"
|
||
})
|
||
|
||
# 构建 content_attributes(logistics 格式)
|
||
content_attributes = {
|
||
"logisticsName": carrier,
|
||
"trackingNumber": tracking_number,
|
||
"currentStep": current_step,
|
||
"isUrgent": False,
|
||
"latestLog": latest_log,
|
||
"latestTime": latest_time,
|
||
"steps": steps,
|
||
"actions": actions
|
||
}
|
||
|
||
# 发送 logistics 类型消息
|
||
client = await self._get_client()
|
||
|
||
payload = {
|
||
"content_type": "logistics",
|
||
"content": "",
|
||
"content_attributes": content_attributes
|
||
}
|
||
|
||
# 记录完整的发送 payload(用于调试)
|
||
logger.info(
|
||
"Sending logistics info",
|
||
conversation_id=conversation_id,
|
||
carrier=carrier,
|
||
tracking_number=tracking_number,
|
||
current_step=current_step,
|
||
timeline_count=len(timeline) if timeline else 0,
|
||
language=language,
|
||
payload_preview=json.dumps(payload, ensure_ascii=False, indent=2)
|
||
)
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/messages",
|
||
json=payload
|
||
)
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
async def send_order_list(
|
||
self,
|
||
conversation_id: int,
|
||
orders: list[dict[str, Any]],
|
||
language: str = "en"
|
||
) -> dict[str, Any]:
|
||
"""发送订单列表(使用自定义 content_type=order_list)
|
||
|
||
每个订单包含:
|
||
- orderNumber: 订单号
|
||
- date: 订单日期
|
||
- status: 订单状态
|
||
- items: 商品列表(图片和名称)
|
||
- actions: 操作按钮
|
||
|
||
Args:
|
||
conversation_id: 会话 ID
|
||
orders: 订单列表,每个订单包含:
|
||
- order_id: 订单号
|
||
- status_text: 订单状态
|
||
- created_at: 下单时间
|
||
- items: 商品列表(可选,包含 name 和 image_url)
|
||
- order_type: 订单类型(可选,用于判断渠道)
|
||
language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en
|
||
|
||
Returns:
|
||
发送结果
|
||
|
||
Example:
|
||
>>> orders = [
|
||
... {
|
||
... "order_id": "20250122001",
|
||
... "created_at": "Jan 22, 2025",
|
||
... "status_text": "Shipped",
|
||
... "items": [
|
||
... {"image_url": "url1", "name": "Product 1"},
|
||
... {"image_url": "url2", "name": "Product 2"}
|
||
... ]
|
||
... }
|
||
... ]
|
||
>>> await chatwoot.send_order_list(123, orders, language="en")
|
||
"""
|
||
# 获取多语言按钮文本
|
||
if language == "zh":
|
||
details_text = "订单详情"
|
||
logistics_text = "物流信息"
|
||
details_reply_prefix = "查看订单详情:"
|
||
logistics_reply_prefix = "查看订单物流信息:"
|
||
else:
|
||
details_text = "Order details"
|
||
logistics_text = "Logistics info"
|
||
details_reply_prefix = "查看订单详情:"
|
||
logistics_reply_prefix = "查看订单物流信息:"
|
||
|
||
# 构建订单列表
|
||
order_list_data = []
|
||
|
||
for order in orders:
|
||
order_id = order.get("order_id", "")
|
||
status_text = order.get("status_text", "")
|
||
|
||
# 获取原始时间并格式化为 YYYY-MM-DD HH:MM:SS
|
||
raw_created_at = order.get("created_at", order.get("date_added", ""))
|
||
created_at = self._format_datetime(raw_created_at)
|
||
|
||
# 构建商品列表
|
||
items = order.get("items", [])
|
||
formatted_items = []
|
||
|
||
logger.info(
|
||
f"Processing order {order_id} for items",
|
||
items_count=len(items) if isinstance(items, list) else 0,
|
||
items_type=type(items).__name__,
|
||
first_item_keys=list(items[0].keys()) if items and len(items) > 0 else None
|
||
)
|
||
|
||
if items and isinstance(items, list):
|
||
for item in items:
|
||
formatted_items.append({
|
||
"image": item.get("image_url", ""),
|
||
"name": item.get("name", "Product")
|
||
})
|
||
|
||
logger.info(
|
||
f"Formatted items for order {order_id}",
|
||
formatted_items_count=len(formatted_items),
|
||
sample_items=str(formatted_items[:2]) if formatted_items else "[]"
|
||
)
|
||
|
||
# 构建操作按钮 - 根据是否有物流信息决定是否显示物流按钮
|
||
actions = [
|
||
{
|
||
"text": details_text,
|
||
"reply": f"{details_reply_prefix}{order_id}"
|
||
}
|
||
]
|
||
|
||
# 只有当订单有物流信息时才显示物流按钮
|
||
if order.get("has_parcels", False):
|
||
actions.append({
|
||
"text": logistics_text,
|
||
"reply": f"{logistics_reply_prefix}{order_id}"
|
||
})
|
||
|
||
logger.debug(
|
||
f"Built {len(actions)} actions for order {order_id}",
|
||
has_parcels=order.get("has_parcels", False),
|
||
actions_count=len(actions)
|
||
)
|
||
|
||
# 构建单个订单
|
||
order_data = {
|
||
"orderNumber": order_id,
|
||
"date": created_at,
|
||
"status": status_text,
|
||
"items": formatted_items,
|
||
"actions": actions
|
||
}
|
||
|
||
order_list_data.append(order_data)
|
||
|
||
# 构建 content_attributes(order_list 格式)
|
||
content_attributes = {
|
||
"orders": order_list_data
|
||
}
|
||
|
||
# 记录发送的数据(用于调试)
|
||
logger.info(
|
||
"Sending order list",
|
||
conversation_id=conversation_id,
|
||
orders_count=len(orders),
|
||
language=language,
|
||
payload_preview=str(content_attributes)[:1000]
|
||
)
|
||
|
||
# 发送 order_list 类型消息
|
||
client = await self._get_client()
|
||
|
||
payload = {
|
||
"content_type": "order_list",
|
||
"content": "",
|
||
"content_attributes": content_attributes
|
||
}
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/messages",
|
||
json=payload
|
||
)
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
async def send_product_cards(
|
||
self,
|
||
conversation_id: int,
|
||
products: list[dict[str, Any]],
|
||
language: str = "en"
|
||
) -> dict[str, Any]:
|
||
"""发送商品搜索结果(使用 cards 格式)
|
||
|
||
Args:
|
||
conversation_id: 会话 ID
|
||
products: 商品列表,每个商品包含:
|
||
- spu_id: SPU ID
|
||
- spu_sn: SPU 编号
|
||
- product_name: 商品名称
|
||
- product_image: 商品图片 URL
|
||
- price: 价格
|
||
- special_price: 特价(可选)
|
||
- stock: 库存
|
||
- sales_count: 销量
|
||
language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en
|
||
|
||
Returns:
|
||
发送结果
|
||
|
||
Example:
|
||
>>> products = [
|
||
... {
|
||
... "spu_id": "12345",
|
||
... "product_name": "Product A",
|
||
... "product_image": "https://...",
|
||
... "price": "99.99",
|
||
... "stock": 100
|
||
... }
|
||
... ]
|
||
>>> await chatwoot.send_product_cards(123, products, language="zh")
|
||
"""
|
||
# 获取前端 URL
|
||
frontend_url = settings.frontend_url.rstrip('/')
|
||
|
||
# 构建商品卡片
|
||
cards = []
|
||
|
||
for product in products:
|
||
spu_id = product.get("spu_id", "")
|
||
spu_sn = product.get("spu_sn", "")
|
||
product_name = product.get("product_name", "Unknown Product")
|
||
product_image = product.get("product_image", "")
|
||
price = product.get("price", "0")
|
||
special_price = product.get("special_price")
|
||
stock = product.get("stock", 0)
|
||
sales_count = product.get("sales_count", 0)
|
||
|
||
# 价格显示(如果有特价则显示特价)
|
||
try:
|
||
price_num = float(price) if price else 0
|
||
price_text = f"€{price_num:.2f}"
|
||
except (ValueError, TypeError):
|
||
price_text = str(price) if price else "€0.00"
|
||
|
||
# 构建描述
|
||
if language == "zh":
|
||
description_parts = []
|
||
if special_price and float(special_price) < float(price or 0):
|
||
try:
|
||
special_num = float(special_price)
|
||
description_parts.append(f"特价: €{special_num:.2f}")
|
||
except:
|
||
pass
|
||
if stock is not None:
|
||
description_parts.append(f"库存: {stock}")
|
||
if sales_count:
|
||
description_parts.append(f"已售: {sales_count}")
|
||
description = " | ".join(description_parts) if description_parts else "暂无详细信息"
|
||
else:
|
||
description_parts = []
|
||
if special_price and float(special_price) < float(price or 0):
|
||
try:
|
||
special_num = float(special_price)
|
||
description_parts.append(f"Special: €{special_num:.2f}")
|
||
except:
|
||
pass
|
||
if stock is not None:
|
||
description_parts.append(f"Stock: {stock}")
|
||
if sales_count:
|
||
description_parts.append(f"Sold: {sales_count}")
|
||
description = " | ".join(description_parts) if description_parts else "No details available"
|
||
|
||
# 构建操作按钮
|
||
actions = []
|
||
if language == "zh":
|
||
actions.append({
|
||
"type": "link",
|
||
"text": "查看详情",
|
||
"uri": f"{frontend_url}/product/detail?spuId={spu_id}"
|
||
})
|
||
if stock and stock > 0:
|
||
actions.append({
|
||
"type": "link",
|
||
"text": "立即购买",
|
||
"uri": f"{frontend_url}/product/detail?spuId={spu_id}"
|
||
})
|
||
else:
|
||
actions.append({
|
||
"type": "link",
|
||
"text": "View Details",
|
||
"uri": f"{frontend_url}/product/detail?spuId={spu_id}"
|
||
})
|
||
if stock and stock > 0:
|
||
actions.append({
|
||
"type": "link",
|
||
"text": "Buy Now",
|
||
"uri": f"{frontend_url}/product/detail?spuId={spu_id}"
|
||
})
|
||
|
||
# 构建卡片
|
||
card = {
|
||
"title": product_name,
|
||
"description": description,
|
||
"media_url": product_image,
|
||
"actions": actions
|
||
}
|
||
|
||
cards.append(card)
|
||
|
||
# 发送 cards 类型消息
|
||
client = await self._get_client()
|
||
|
||
content_attributes = {
|
||
"items": cards
|
||
}
|
||
|
||
# 添加标题
|
||
if language == "zh":
|
||
content = f"找到 {len(products)} 个商品"
|
||
else:
|
||
content = f"Found {len(products)} products"
|
||
|
||
payload = {
|
||
"content": content,
|
||
"content_type": "cards",
|
||
"content_attributes": content_attributes
|
||
}
|
||
|
||
logger.info(
|
||
"Sending product cards",
|
||
conversation_id=conversation_id,
|
||
products_count=len(products),
|
||
language=language,
|
||
payload_preview=str(payload)[:1000]
|
||
)
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/messages",
|
||
json=payload
|
||
)
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
# ============ Conversations ============
|
||
|
||
async def get_conversation(self, conversation_id: int) -> dict[str, Any]:
|
||
"""Get conversation details
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
|
||
Returns:
|
||
Conversation data
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
response = await client.get(f"/conversations/{conversation_id}")
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
async def update_conversation_status(
|
||
self,
|
||
conversation_id: int,
|
||
status: ConversationStatus
|
||
) -> dict[str, Any]:
|
||
"""Update conversation status
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
status: New status
|
||
|
||
Returns:
|
||
Updated conversation data
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/toggle_status",
|
||
json={"status": status.value}
|
||
)
|
||
response.raise_for_status()
|
||
|
||
logger.info(
|
||
"Conversation status updated",
|
||
conversation_id=conversation_id,
|
||
status=status.value
|
||
)
|
||
return response.json()
|
||
|
||
async def add_labels(
|
||
self,
|
||
conversation_id: int,
|
||
labels: list[str]
|
||
) -> dict[str, Any]:
|
||
"""Add labels to a conversation
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
labels: List of label names
|
||
|
||
Returns:
|
||
Updated labels
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/labels",
|
||
json={"labels": labels}
|
||
)
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
async def toggle_typing_status(
|
||
self,
|
||
conversation_id: int,
|
||
typing_status: str # "on" or "off"
|
||
) -> dict[str, Any]:
|
||
"""Toggle typing status for a conversation
|
||
|
||
显示/隐藏"正在输入..."状态指示器。Chatwoot 会根据用户设置的语言
|
||
自动显示对应文本(如"正在输入..."、"Typing..."等)。
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
typing_status: Typing status, either "on" or "off"
|
||
|
||
Returns:
|
||
API response
|
||
|
||
Raises:
|
||
ValueError: If typing_status is not "on" or "off"
|
||
|
||
Example:
|
||
>>> # 开启 typing status
|
||
>>> await chatwoot.toggle_typing_status(123, "on")
|
||
>>> # 处理消息...
|
||
>>> # 关闭 typing status
|
||
>>> await chatwoot.toggle_typing_status(123, "off")
|
||
"""
|
||
if typing_status not in ["on", "off"]:
|
||
raise ValueError(
|
||
f"typing_status must be 'on' or 'off', got '{typing_status}'"
|
||
)
|
||
|
||
client = await self._get_client()
|
||
|
||
# 尝试使用 admin API 端点
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/toggle_typing_status",
|
||
json={"typing_status": typing_status}
|
||
)
|
||
|
||
# 如果 admin API 不可用,返回空响应(兼容性处理)
|
||
try:
|
||
response.raise_for_status()
|
||
except httpx.HTTPStatusError as e:
|
||
logger.warning(
|
||
"Failed to toggle typing status via admin API",
|
||
conversation_id=conversation_id,
|
||
status=typing_status,
|
||
error=str(e)
|
||
)
|
||
# 尝试降级到 public API
|
||
return await self._toggle_typing_status_public(
|
||
conversation_id, typing_status
|
||
)
|
||
|
||
logger.debug(
|
||
"Typing status toggled",
|
||
conversation_id=conversation_id,
|
||
status=typing_status
|
||
)
|
||
|
||
# 尝试解析 JSON 响应,如果失败则返回空字典
|
||
try:
|
||
return response.json() if response.content else {}
|
||
except Exception:
|
||
return {}
|
||
|
||
async def _toggle_typing_status_public(
|
||
self,
|
||
conversation_id: int,
|
||
typing_status: str
|
||
) -> dict[str, Any]:
|
||
"""使用 public API 端点切换 typing status(降级方案)
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
typing_status: Typing status, either "on" or "off"
|
||
|
||
Returns:
|
||
API response
|
||
"""
|
||
# TODO: 实现 public API 调用(需要 inbox_identifier 和 contact_identifier)
|
||
# 目前先记录日志并返回空响应
|
||
logger.info(
|
||
"Public API typing status not implemented, skipping",
|
||
conversation_id=conversation_id,
|
||
status=typing_status
|
||
)
|
||
return {}
|
||
|
||
@asynccontextmanager
|
||
async def typing_indicator(self, conversation_id: int):
|
||
"""Typing indicator 上下文管理器
|
||
|
||
自动管理 typing status 的开启和关闭。进入上下文时开启,
|
||
退出时自动关闭,即使发生异常也会确保关闭。
|
||
|
||
显示多语言支持:Chatwoot 会根据用户语言自动显示对应文本
|
||
(如中文"正在输入..."、英文"Typing..."等)。
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
|
||
Yields:
|
||
None
|
||
|
||
Example:
|
||
>>> async with chatwoot.typing_indicator(conversation_id):
|
||
... # 处理消息或发送请求
|
||
... await process_message(...)
|
||
... await send_message(...)
|
||
>>> # 退出上下文时自动关闭 typing status
|
||
"""
|
||
try:
|
||
# 开启 typing status
|
||
await self.toggle_typing_status(conversation_id, "on")
|
||
yield
|
||
finally:
|
||
# 确保关闭 typing status(即使发生异常)
|
||
try:
|
||
await self.toggle_typing_status(conversation_id, "off")
|
||
except Exception as e:
|
||
# 记录警告但不抛出异常
|
||
logger.debug(
|
||
"Failed to disable typing status in context manager",
|
||
conversation_id=conversation_id,
|
||
error=str(e)
|
||
)
|
||
|
||
async def assign_agent(
|
||
self,
|
||
conversation_id: int,
|
||
agent_id: int
|
||
) -> dict[str, Any]:
|
||
"""Assign an agent to a conversation
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
agent_id: Agent user ID
|
||
|
||
Returns:
|
||
Assignment result
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
response = await client.post(
|
||
f"/conversations/{conversation_id}/assignments",
|
||
json={"assignee_id": agent_id}
|
||
)
|
||
response.raise_for_status()
|
||
|
||
logger.info(
|
||
"Agent assigned",
|
||
conversation_id=conversation_id,
|
||
agent_id=agent_id
|
||
)
|
||
return response.json()
|
||
|
||
# ============ Contacts ============
|
||
|
||
async def get_contact(self, contact_id: int) -> dict[str, Any]:
|
||
"""Get contact details
|
||
|
||
Args:
|
||
contact_id: Contact ID
|
||
|
||
Returns:
|
||
Contact data
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
response = await client.get(f"/contacts/{contact_id}")
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
async def update_contact(
|
||
self,
|
||
contact_id: int,
|
||
attributes: dict[str, Any]
|
||
) -> dict[str, Any]:
|
||
"""Update contact attributes
|
||
|
||
Args:
|
||
contact_id: Contact ID
|
||
attributes: Attributes to update
|
||
|
||
Returns:
|
||
Updated contact data
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
response = await client.put(
|
||
f"/contacts/{contact_id}",
|
||
json=attributes
|
||
)
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
# ============ Messages History ============
|
||
|
||
async def get_messages(
|
||
self,
|
||
conversation_id: int,
|
||
before: Optional[int] = None
|
||
) -> list[dict[str, Any]]:
|
||
"""Get conversation messages
|
||
|
||
Args:
|
||
conversation_id: Conversation ID
|
||
before: Get messages before this message ID
|
||
|
||
Returns:
|
||
List of messages
|
||
"""
|
||
client = await self._get_client()
|
||
|
||
params = {}
|
||
if before:
|
||
params["before"] = before
|
||
|
||
response = await client.get(
|
||
f"/conversations/{conversation_id}/messages",
|
||
params=params
|
||
)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
return data.get("payload", [])
|
||
|
||
|
||
# ============ Helper Functions ============
|
||
|
||
def format_order_card_markdown(order_data: dict[str, Any]) -> str:
|
||
"""格式化订单信息为 Markdown 卡片
|
||
|
||
Args:
|
||
order_data: 订单数据,包含订单号、状态、商品、金额、物流等信息
|
||
|
||
Returns:
|
||
格式化的 Markdown 字符串
|
||
|
||
Example:
|
||
>>> order = {
|
||
... "order_id": "123456789",
|
||
... "status": "shipped",
|
||
... "status_text": "已发货",
|
||
... "created_at": "2023-10-27 14:30",
|
||
... "items": [...],
|
||
... "total_amount": "1058.00",
|
||
... "shipping_fee": "0.00",
|
||
... "logistics": {...}
|
||
... }
|
||
>>> markdown = format_order_card_markdown(order)
|
||
"""
|
||
# 订单状态 emoji 映射
|
||
status_emoji = {
|
||
"pending": "⏳",
|
||
"paid": "💰",
|
||
"processing": "⚙️",
|
||
"shipped": "📦",
|
||
"delivered": "✅",
|
||
"completed": "✅",
|
||
"cancelled": "❌",
|
||
"refunded": "💸",
|
||
"failed": "⚠️",
|
||
}
|
||
|
||
# 获取状态文本和 emoji
|
||
status = order_data.get("status", "unknown")
|
||
status_text = order_data.get("status_text", status)
|
||
emoji = status_emoji.get(status, "📦")
|
||
|
||
lines = [
|
||
f"{emoji} **订单状态:{status_text}**",
|
||
f"📝 **订单号:** `{order_data.get('order_id', '')}`",
|
||
]
|
||
|
||
# 添加下单时间(如果有)
|
||
if order_data.get("created_at"):
|
||
lines.append(f"📅 **下单时间:** {order_data['created_at']}")
|
||
|
||
lines.append("") # 空行
|
||
lines.append("**商品详情**")
|
||
|
||
# 添加商品列表
|
||
items = order_data.get("items", [])
|
||
if items:
|
||
for item in items:
|
||
name = item.get("name", "未知商品")
|
||
quantity = item.get("quantity", 1)
|
||
price = item.get("price", "0.00")
|
||
# 可选:添加图片链接
|
||
image_markdown = ""
|
||
if item.get("image_url"):
|
||
image_markdown = f" [图片]({item['image_url']})"
|
||
lines.append(f"▫️{image_markdown} {name} × {quantity} ¥{price}")
|
||
else:
|
||
lines.append("▫️ 无商品信息")
|
||
|
||
# 添加金额信息
|
||
lines.extend([
|
||
"",
|
||
f"💰 **实付:** ¥{order_data.get('total_amount', '0.00')} (含运费 ¥{order_data.get('shipping_fee', '0.00')})"
|
||
])
|
||
|
||
# 添加物流信息(如果有)
|
||
logistics = order_data.get("logistics")
|
||
if logistics:
|
||
lines.extend([
|
||
"",
|
||
"🚚 **物流信息**",
|
||
f"承运商:{logistics.get('carrier', '未知')}",
|
||
f"单号:{logistics.get('tracking_number', '未知')}",
|
||
"*点击单号可复制跟踪*"
|
||
])
|
||
|
||
# 添加备注(如果有)
|
||
if order_data.get("remark"):
|
||
lines.extend([
|
||
"",
|
||
f"📋 **备注:** {order_data['remark']}"
|
||
])
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def create_action_buttons(actions: list[dict[str, Any]]) -> dict[str, Any]:
|
||
"""创建 Chatwoot 操作按钮卡片
|
||
|
||
Args:
|
||
actions: 按钮配置列表,每个按钮包含:
|
||
- type: "link" 或 "postback"
|
||
- text: 按钮文字
|
||
- uri: 链接地址(type=link 时)
|
||
- payload: 回传数据(type=postback 时)
|
||
|
||
Returns:
|
||
符合 Chatwoot content_attributes 格式的字典
|
||
|
||
Example:
|
||
>>> actions = [
|
||
... {"type": "link", "text": "查看详情", "uri": "https://example.com"},
|
||
... {"type": "postback", "text": "联系客服", "payload": "CONTACT_SUPPORT"}
|
||
... ]
|
||
>>> buttons = create_action_buttons(actions)
|
||
"""
|
||
return {
|
||
"items": [{
|
||
"title": "操作",
|
||
"actions": actions
|
||
}]
|
||
}
|
||
|
||
|
||
# Global Chatwoot client instance
|
||
chatwoot_client: Optional[ChatwootClient] = None
|
||
|
||
|
||
def get_chatwoot_client(account_id: Optional[int] = None) -> ChatwootClient:
|
||
"""Get or create Chatwoot client instance
|
||
|
||
Args:
|
||
account_id: Chatwoot account ID. If not provided, uses settings default.
|
||
"""
|
||
global chatwoot_client
|
||
if account_id is not None:
|
||
# 创建指定 account_id 的客户端实例
|
||
return ChatwootClient(account_id=account_id)
|
||
if chatwoot_client is None:
|
||
chatwoot_client = ChatwootClient()
|
||
return chatwoot_client
|