Compare commits

...

2 Commits

Author SHA1 Message Date
wangliang
2dd46a8626 feat: 添加 typing status 状态指示器
- 在处理消息时自动显示"正在输入..."状态
- 处理完成后自动隐藏状态指示器
- 错误处理时确保状态指示器被关闭
- 提升用户体验,让用户知道 AI 正在处理请求

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-26 13:15:59 +08:00
wangliang
9fe29ff3fe feat: 完善物流信息展示功能
- 修复物流信息中 order_id 字段缺失的问题,确保按钮正常生成
- 添加 tracking_url 支持,新增"官网追踪"按钮
- "官网追踪"按钮在新标签页打开 (target: "_blank")
- 改进日志记录,添加完整的 payload 预览

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-26 13:15:47 +08:00
3 changed files with 218 additions and 21 deletions

View File

@@ -815,8 +815,10 @@ def _parse_logistics_data(data: dict[str, Any]) -> dict[str, Any]:
) )
return { return {
"order_id": mcp_result.get("order_id", ""), # 添加订单号
"carrier": mcp_result.get("courier", mcp_result.get("carrier", mcp_result.get("express_name", "未知"))), "carrier": mcp_result.get("courier", mcp_result.get("carrier", mcp_result.get("express_name", "未知"))),
"tracking_number": mcp_result.get("tracking_number") or "", "tracking_number": mcp_result.get("tracking_number") or "",
"tracking_url": mcp_result.get("tracking_url", mcp_result.get("trackingUrl", "")), # 添加追踪链接
"status": mcp_result.get("status"), "status": mcp_result.get("status"),
"estimated_delivery": mcp_result.get("estimatedDelivery"), "estimated_delivery": mcp_result.get("estimatedDelivery"),
"timeline": mcp_result.get("timeline", []) "timeline": mcp_result.get("timeline", [])

View File

@@ -5,6 +5,7 @@ import json
from typing import Any, Optional from typing import Any, Optional
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from contextlib import asynccontextmanager
import httpx import httpx
@@ -646,6 +647,7 @@ class ChatwootClient:
- status: 当前状态 - status: 当前状态
- timeline: 物流轨迹列表 - timeline: 物流轨迹列表
- order_id: 订单号(可选,用于生成链接) - order_id: 订单号(可选,用于生成链接)
- tracking_url: 官方追踪链接(可选,跳转到物流公司官网)
language: 语言代码en, nl, de, es, fr, it, tr, zh默认 en language: 语言代码en, nl, de, es, fr, it, tr, zh默认 en
Returns: Returns:
@@ -665,6 +667,7 @@ class ChatwootClient:
status = logistics_data.get("status", "") status = logistics_data.get("status", "")
timeline = logistics_data.get("timeline", []) timeline = logistics_data.get("timeline", [])
order_id = logistics_data.get("order_id", "") order_id = logistics_data.get("order_id", "")
tracking_url = logistics_data.get("tracking_url", "") # 添加追踪链接
# 获取最新物流信息(从 timeline 中提取 remark # 获取最新物流信息(从 timeline 中提取 remark
latest_log = "" latest_log = ""
@@ -760,6 +763,23 @@ class ChatwootClient:
"url": f"{frontend_url}/logistic-tracking/{order_id}" "url": f"{frontend_url}/logistic-tracking/{order_id}"
}) })
# 如果有 tracking_url添加官方追踪按钮在新标签页打开
if tracking_url:
if language == "zh":
actions.append({
"text": "官网追踪",
"style": "default",
"url": tracking_url,
"target": "_blank"
})
else:
actions.append({
"text": "Official Tracking",
"style": "default",
"url": tracking_url,
"target": "_blank"
})
# 构建 content_attributeslogistics 格式) # 构建 content_attributeslogistics 格式)
content_attributes = { content_attributes = {
"logisticsName": carrier, "logisticsName": carrier,
@@ -772,17 +792,6 @@ class ChatwootClient:
"actions": actions "actions": actions
} }
# 记录发送的数据(用于调试)
logger.info(
"Sending logistics info",
conversation_id=conversation_id,
carrier=carrier,
tracking_number=tracking_number,
current_step=current_step,
timeline_count=len(timeline) if timeline else 0,
language=language
)
# 发送 logistics 类型消息 # 发送 logistics 类型消息
client = await self._get_client() client = await self._get_client()
@@ -792,6 +801,18 @@ class ChatwootClient:
"content_attributes": content_attributes "content_attributes": content_attributes
} }
# 记录完整的发送 payload用于调试
logger.info(
"Sending logistics info",
conversation_id=conversation_id,
carrier=carrier,
tracking_number=tracking_number,
current_step=current_step,
timeline_count=len(timeline) if timeline else 0,
language=language,
payload_preview=json.dumps(payload, ensure_ascii=False, indent=2)
)
response = await client.post( response = await client.post(
f"/conversations/{conversation_id}/messages", f"/conversations/{conversation_id}/messages",
json=payload json=payload
@@ -1014,6 +1035,135 @@ class ChatwootClient:
return response.json() return response.json()
async def toggle_typing_status(
self,
conversation_id: int,
typing_status: str # "on" or "off"
) -> dict[str, Any]:
"""Toggle typing status for a conversation
显示/隐藏"正在输入..."状态指示器。Chatwoot 会根据用户设置的语言
自动显示对应文本(如"正在输入...""Typing..."等)。
Args:
conversation_id: Conversation ID
typing_status: Typing status, either "on" or "off"
Returns:
API response
Raises:
ValueError: If typing_status is not "on" or "off"
Example:
>>> # 开启 typing status
>>> await chatwoot.toggle_typing_status(123, "on")
>>> # 处理消息...
>>> # 关闭 typing status
>>> await chatwoot.toggle_typing_status(123, "off")
"""
if typing_status not in ["on", "off"]:
raise ValueError(
f"typing_status must be 'on' or 'off', got '{typing_status}'"
)
client = await self._get_client()
# 尝试使用 admin API 端点
response = await client.post(
f"/conversations/{conversation_id}/toggle_typing_status",
json={"typing_status": typing_status}
)
# 如果 admin API 不可用,返回空响应(兼容性处理)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.warning(
"Failed to toggle typing status via admin API",
conversation_id=conversation_id,
status=typing_status,
error=str(e)
)
# 尝试降级到 public API
return await self._toggle_typing_status_public(
conversation_id, typing_status
)
logger.debug(
"Typing status toggled",
conversation_id=conversation_id,
status=typing_status
)
# 尝试解析 JSON 响应,如果失败则返回空字典
try:
return response.json() if response.content else {}
except Exception:
return {}
async def _toggle_typing_status_public(
self,
conversation_id: int,
typing_status: str
) -> dict[str, Any]:
"""使用 public API 端点切换 typing status降级方案
Args:
conversation_id: Conversation ID
typing_status: Typing status, either "on" or "off"
Returns:
API response
"""
# TODO: 实现 public API 调用(需要 inbox_identifier 和 contact_identifier
# 目前先记录日志并返回空响应
logger.info(
"Public API typing status not implemented, skipping",
conversation_id=conversation_id,
status=typing_status
)
return {}
@asynccontextmanager
async def typing_indicator(self, conversation_id: int):
"""Typing indicator 上下文管理器
自动管理 typing status 的开启和关闭。进入上下文时开启,
退出时自动关闭,即使发生异常也会确保关闭。
显示多语言支持Chatwoot 会根据用户语言自动显示对应文本
(如中文"正在输入..."、英文"Typing..."等)。
Args:
conversation_id: Conversation ID
Yields:
None
Example:
>>> async with chatwoot.typing_indicator(conversation_id):
... # 处理消息或发送请求
... await process_message(...)
... await send_message(...)
>>> # 退出上下文时自动关闭 typing status
"""
try:
# 开启 typing status
await self.toggle_typing_status(conversation_id, "on")
yield
finally:
# 确保关闭 typing status即使发生异常
try:
await self.toggle_typing_status(conversation_id, "off")
except Exception as e:
# 记录警告但不抛出异常
logger.debug(
"Failed to disable typing status in context manager",
conversation_id=conversation_id,
error=str(e)
)
async def assign_agent( async def assign_agent(
self, self,
conversation_id: int, conversation_id: int,

View File

@@ -311,6 +311,27 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token:
context["channel"] = message_channel context["channel"] = message_channel
context["is_email"] = is_email context["is_email"] = is_email
# 创建 Chatwoot client提前创建以便开启 typing status
from integrations.chatwoot import ChatwootClient
chatwoot = ChatwootClient(account_id=int(account_id))
# 开启 typing status显示"正在输入..."
try:
await chatwoot.toggle_typing_status(
conversation_id=conversation.id,
typing_status="on"
)
logger.debug(
"Typing status enabled",
conversation_id=conversation_id
)
except Exception as e:
logger.warning(
"Failed to enable typing status",
conversation_id=conversation_id,
error=str(e)
)
try: try:
# Process message through agent workflow # Process message through agent workflow
final_state = await process_message( final_state = await process_message(
@@ -329,9 +350,8 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token:
if response is None: if response is None:
response = "抱歉,我暂时无法处理您的请求。请稍后重试或联系人工客服。" response = "抱歉,我暂时无法处理您的请求。请稍后重试或联系人工客服。"
# Create Chatwoot client # Create Chatwoot client(已在前面创建,这里不需要再次创建)
from integrations.chatwoot import ChatwootClient # chatwoot 已在 try 块之前创建
chatwoot = ChatwootClient(account_id=int(account_id))
# Send response to Chatwoot (skip if empty - agent may have already sent rich content) # Send response to Chatwoot (skip if empty - agent may have already sent rich content)
if response: if response:
@@ -340,6 +360,23 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token:
content=response content=response
) )
# 关闭 typing status隐藏"正在输入..."
try:
await chatwoot.toggle_typing_status(
conversation_id=conversation.id,
typing_status="off"
)
logger.debug(
"Typing status disabled",
conversation_id=conversation_id
)
except Exception as e:
logger.warning(
"Failed to disable typing status",
conversation_id=conversation_id,
error=str(e)
)
# Handle human handoff # Handle human handoff
if final_state.get("requires_human"): if final_state.get("requires_human"):
await chatwoot.update_conversation_status( await chatwoot.update_conversation_status(
@@ -377,8 +414,16 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token:
error=str(e) error=str(e)
) )
# 关闭 typing status错误时也要关闭
try:
await chatwoot.toggle_typing_status(
conversation_id=conversation.id,
typing_status="off"
)
except Exception:
pass # 忽略关闭时的错误
# Send error response # Send error response
chatwoot = get_chatwoot_client(account_id=int(account_id))
await chatwoot.send_message( await chatwoot.send_message(
conversation_id=conversation.id, conversation_id=conversation.id,
content="抱歉,处理您的消息时遇到了问题。我们的客服团队将尽快为您服务。" content="抱歉,处理您的消息时遇到了问题。我们的客服团队将尽快为您服务。"