Files
assistant/agent/webhooks/chatwoot_webhook.py
wangliang 965b11316e feat: 添加图片搜索功能和 Qwen 模型支持
图片搜索功能(以图搜图):
- Chatwoot webhook 检测图片搜索消息 (content_type="search_image")
- 从 content_attributes.url 提取图片 URL
- 调用 Mall API 图片搜索接口 (/mall/api/spu?searchImageUrl=...)
- 支持嵌套和顶层 URL 位置提取
- Product Agent 添加 fast path 直接调用图片搜索工具
- 防止无限循环(使用后清除 context.image_search_url)

Qwen 模型支持:
- 添加 LLM provider 选择(zhipu/qwen)
- 实现 QwenLLMClient 类(基于 DashScope SDK)
- 添加 dashscope>=1.14.0 依赖
- 修复 API key 设置(直接设置 dashscope.api_key)
- 更新 .env.example 和 docker-compose.yml 配置

其他优化:
- 重构 Chatwoot 集成代码(删除冗余)
- 优化 Product Agent prompt
- 增强 Customer Service Agent 多语言支持

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-27 19:10:06 +08:00

713 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Chatwoot Webhook Handler
"""
import hmac
import hashlib
from typing import Any, Optional
from fastapi import APIRouter, Request, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from config import settings
from core.graph import process_message
from integrations.chatwoot import get_chatwoot_client, ConversationStatus
from utils.cache import get_cache_manager
from utils.logger import get_logger
from utils.token_manager import TokenManager
logger = get_logger(__name__)
router = APIRouter(prefix="/webhooks", tags=["webhooks"])
# ============ Webhook Payload Models ============
class WebhookSender(BaseModel):
"""Webhook sender information"""
id: Optional[int] = None
name: Optional[str] = None
email: Optional[str] = None
type: Optional[str] = None # "contact" or "user"
identifier: Optional[str] = None
jwt_token: Optional[str] = None # JWT token at sender root level
mall_token: Optional[str] = None # Mall token at sender root level
custom_attributes: Optional[dict] = None # May also contain tokens
class WebhookMessage(BaseModel):
"""Webhook message content"""
id: int
content: Optional[str] = None
message_type: str # "incoming" or "outgoing"
content_type: Optional[str] = None
private: bool = False
sender: Optional[WebhookSender] = None
class WebhookConversation(BaseModel):
"""Webhook conversation information"""
id: int
inbox_id: int
status: str
account_id: Optional[int] = None # Chatwoot may not always include this
contact_inbox: Optional[dict] = None
messages: Optional[list] = None
additional_attributes: Optional[dict] = None
can_reply: Optional[bool] = None
channel: Optional[str] = None
meta: Optional[dict] = None # Contains sender info including custom_attributes
class WebhookContact(BaseModel):
"""Webhook contact information"""
id: int
name: Optional[str] = None
email: Optional[str] = None
phone_number: Optional[str] = None
custom_attributes: Optional[dict] = None
class ChatwootWebhookPayload(BaseModel):
"""Chatwoot webhook payload structure"""
event: str
id: Optional[int] = None
content: Optional[str] = None
message_type: Optional[str] = None
content_type: Optional[str] = None
private: Optional[bool] = False
content_attributes: Optional[dict] = None # 图片搜索 URL 等额外属性
conversation: Optional[WebhookConversation] = None
sender: Optional[WebhookSender] = None
contact: Optional[WebhookContact] = None
account: Optional[dict] = None
# ============ Signature Verification ============
def verify_webhook_signature(payload: bytes, signature: str) -> bool:
"""Verify Chatwoot webhook signature
Args:
payload: Raw request body
signature: X-Chatwoot-Signature header value
Returns:
True if signature is valid
"""
# TODO: Re-enable signature verification after configuring Chatwoot properly
# For now, skip verification to test webhook functionality
logger.debug("Skipping webhook signature verification for testing")
return True
if not settings.chatwoot_webhook_secret:
logger.warning("Webhook secret not configured, skipping verification")
return True
if not signature:
logger.warning("No signature provided in request")
return True
expected = hmac.new(
settings.chatwoot_webhook_secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
# ============ Message Processing ============
async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: str = None) -> None:
"""Process incoming message from Chatwoot
Args:
payload: Webhook payload
cookie_token: User token from request cookies
"""
conversation = payload.conversation
if not conversation:
logger.warning("No conversation in payload")
return
conversation_id = str(conversation.id)
content = payload.content
# 检查是否是图片搜索消息(在过滤空 content 之前)
# 图片搜索消息的 content 通常是空的,但需要处理
is_image_search = (
payload.content_type == "search_image" or
payload.content_type == 17
)
# 只有非图片搜索消息才检查 content 是否为空
if not content and not is_image_search:
logger.debug("Empty message content, skipping")
return
# Get user/contact info
contact = payload.contact or payload.sender
user_id = str(contact.id) if contact else "unknown"
# Log webhook payload structure for debugging
logger.info(
"Webhook payload structure",
payload_event=payload.event,
has_conversation=bool(conversation),
has_contact=bool(contact),
contact_type=type(contact).__name__ if contact else None,
conversation_id=conversation_id
)
# Log full payload keys for debugging
payload_dict = payload.model_dump()
logger.info(
"Full webhook payload keys",
keys=list(payload_dict.keys()),
conversation_keys=list(payload_dict.get('conversation', {}).keys()) if payload_dict.get('conversation') else [],
contact_keys=list(payload_dict.get('contact', {}).keys()) if payload_dict.get('contact') else [],
sender_keys=list(payload_dict.get('sender', {}).keys()) if payload_dict.get('sender') else []
)
# 打印完整的 payload 内容用于调试
import json
logger.info(
"Full webhook payload JSON",
payload_json=json.dumps(payload_dict, indent=2, ensure_ascii=False, default=str)
)
# Get account_id from payload (top-level account object)
# Chatwoot webhook includes account info at the top level
account_obj = payload.account
# 从 webhook 中动态获取 account_id
account_id = str(account_obj.get("id")) if account_obj else "1"
# 优先使用 Cookie 中的 token
user_token = cookie_token
mall_token = None
# 如果 Cookie 中没有,尝试从多个来源提取 token
if not user_token:
# 1. 尝试从 contact/custom_attributes 获取
if contact:
logger.info(
"Checking contact for token",
contact_id=contact.id if contact else None,
contact_type=type(contact).__name__
)
# 只有 WebhookContact 才有 custom_attributes
if hasattr(contact, 'custom_attributes'):
logger.info(
"Checking contact custom_attributes",
has_custom_attributes=bool(contact.custom_attributes)
)
custom_attrs = contact.custom_attributes or {}
logger.info(
"Contact custom_attributes",
keys=list(custom_attrs.keys()) if custom_attrs else [],
has_jwt_token='jwt_token' in custom_attrs if custom_attrs else False,
has_mall_token='mall_token' in custom_attrs if custom_attrs else False
)
# 同时提取 jwt_token 和 mall_token
if custom_attrs.get('jwt_token'):
user_token = custom_attrs.get('jwt_token')
logger.info("JWT token found in contact.custom_attributes", token_prefix=user_token[:20] if user_token else None)
if custom_attrs.get('mall_token'):
mall_token = custom_attrs.get('mall_token')
logger.info("Mall token found in contact.custom_attributes", token_prefix=mall_token[:20] if mall_token else None)
# 如果没有找到 token尝试使用通用字段
if not user_token and not mall_token:
contact_dict = {"custom_attributes": custom_attrs}
user_token = TokenManager.extract_token_from_contact(contact_dict)
logger.debug("Extracted token from contact (generic)", has_token=bool(user_token))
else:
logger.debug("Contact type is WebhookSender, no custom_attributes available")
# 2. 尝试从 conversation.meta.sender 获取Chatwoot SDK setUser 设置的位置)
if (not user_token or not mall_token) and conversation:
logger.debug("Conversation object type", type=str(type(conversation)))
if hasattr(conversation, 'model_dump'):
conv_dict = conversation.model_dump()
logger.debug("Conversation dict keys", keys=list(conv_dict.keys()))
logger.debug("Has meta", has_meta='meta' in conv_dict)
meta_sender = conv_dict.get('meta', {}).get('sender', {})
logger.info(
"Conversation meta.sender",
has_sender=bool(meta_sender),
sender_keys=list(meta_sender.keys()) if meta_sender else [],
has_custom_attributes=bool(meta_sender.get('custom_attributes')) if meta_sender else False
)
# 2.1. 优先从 meta.sender 根级别获取 token
if not user_token and meta_sender.get('jwt_token'):
user_token = meta_sender.get('jwt_token')
logger.info("JWT token found in conversation.meta.sender (root level)", token_prefix=user_token[:20] if user_token else None)
if not mall_token and meta_sender.get('mall_token'):
mall_token = meta_sender.get('mall_token')
logger.info("Mall token found in conversation.meta.sender (root level)", token_prefix=mall_token[:20] if mall_token else None)
# 2.2. 其次从 meta.sender.custom_attributes 获取
if (not user_token or not mall_token) and meta_sender.get('custom_attributes'):
logger.info("Found custom_attributes in meta.sender", keys=list(meta_sender['custom_attributes'].keys()))
custom_attrs = meta_sender['custom_attributes']
if not user_token and custom_attrs.get('jwt_token'):
user_token = custom_attrs.get('jwt_token')
logger.info("JWT token found in conversation.meta.sender.custom_attributes", token_prefix=user_token[:20] if user_token else None)
if not mall_token and custom_attrs.get('mall_token'):
mall_token = custom_attrs.get('mall_token')
logger.info("Mall token found in conversation.meta.sender.custom_attributes", token_prefix=mall_token[:20] if mall_token else None)
# 如果只有 jwt_token将它也用作 mall_token
if user_token and not mall_token:
mall_token = user_token
logger.debug("Using jwt_token as mall_token", token_prefix=mall_token[:20] if mall_token else None)
if user_token:
logger.info(
"JWT token found",
user_id=user_id,
source="cookie" if cookie_token else "contact",
token_prefix=user_token[:20] if user_token else None
)
else:
logger.warning(
"No JWT token found from any source",
user_id=user_id,
cookie_token_exists=bool(cookie_token),
contact_type=type(contact).__name__ if contact else None
)
logger.info(
"Processing incoming message",
conversation_id=conversation_id,
user_id=user_id,
has_token=bool(user_token),
message_length=len(content),
channel=conversation.channel if conversation else None
)
# Load conversation context from cache (需要在图片搜索检测之前)
cache = get_cache_manager()
await cache.connect()
context = await cache.get_context(conversation_id) or {}
history = await cache.get_messages(conversation_id)
# Add token to context if available
if user_token:
context["user_token"] = user_token
if mall_token:
context["mall_token"] = mall_token
# 图片 URL 可能在两个位置:
# 1. payload.content_attributes.url (顶层)
# 2. payload.conversation.messages[0].content_attributes.url (嵌套)
image_url = None
# 首先尝试从顶层获取
if payload.content_attributes:
image_url = payload.content_attributes.get("url")
# 如果顶层没有,尝试从 conversation.messages[0] 获取
if not image_url and conversation and hasattr(conversation, 'model_dump'):
conv_dict = conversation.model_dump()
messages = conv_dict.get('messages', [])
if messages and len(messages) > 0:
first_message = messages[0]
msg_content_attrs = first_message.get('content_attributes', {})
image_url = msg_content_attrs.get('url')
if is_image_search and image_url:
logger.info(
"Image search detected",
conversation_id=conversation_id,
image_url=image_url[:100] + "..." if len(image_url) > 100 else image_url
)
# 创建 Chatwoot client
from integrations.chatwoot import ChatwootClient
chatwoot = ChatwootClient(account_id=int(account_id))
# 开启 typing status
try:
await chatwoot.toggle_typing_status(
conversation_id=conversation.id,
typing_status="on"
)
except Exception as e:
logger.warning(
"Failed to enable typing status for image search",
conversation_id=conversation_id,
error=str(e)
)
# 直接调用图片搜索工具
# 修改 content 为图片搜索指令
search_content = f"[图片搜索] 请根据这张图片搜索相似的商品"
# 使用特殊标记让 Product Agent 知道这是图片搜索
context["image_search_url"] = image_url
final_state = await process_message(
conversation_id=conversation_id,
user_id=user_id,
account_id=account_id,
message=search_content,
history=history,
context=context,
user_token=user_token,
mall_token=mall_token
)
# 获取响应并发送
response = final_state.get("response", "")
if response:
await chatwoot.send_message(
conversation_id=conversation.id,
content=response
)
logger.info(
"Image search response sent",
conversation_id=conversation_id,
response_length=len(response)
)
# 关闭 typing status
await chatwoot.toggle_typing_status(
conversation_id=conversation.id,
typing_status="off"
)
await chatwoot.close()
# Update cache
await cache.add_message(conversation_id, "user", search_content)
await cache.add_message(conversation_id, "assistant", response)
# Save context
new_context = final_state.get("context", {})
new_context["last_intent"] = final_state.get("intent")
await cache.set_context(conversation_id, new_context)
logger.info(
"Image search message processed successfully",
conversation_id=conversation_id,
intent=final_state.get("intent")
)
return JSONResponse(
content={"status": "success"},
status_code=200
)
# 识别消息渠道(邮件、网站等)
message_channel = conversation.channel if conversation else "Channel"
is_email = message_channel == "Email"
# 添加渠道信息到 context让 Agent 知道是邮件还是网站)
context["channel"] = message_channel
context["is_email"] = is_email
# 邮件渠道特殊处理
if is_email:
logger.info(
"Email channel detected",
conversation_id=conversation_id,
sender_email=contact.email if contact else None
)
# 创建 Chatwoot client提前创建以便开启 typing status
from integrations.chatwoot import ChatwootClient
chatwoot = ChatwootClient(account_id=int(account_id))
# 开启 typing status显示"正在输入..."
try:
await chatwoot.toggle_typing_status(
conversation_id=conversation.id,
typing_status="on"
)
logger.debug(
"Typing status enabled",
conversation_id=conversation_id
)
except Exception as e:
logger.warning(
"Failed to enable typing status",
conversation_id=conversation_id,
error=str(e)
)
try:
# Process message through agent workflow
final_state = await process_message(
conversation_id=conversation_id,
user_id=user_id,
account_id=account_id,
message=content,
history=history,
context=context,
user_token=user_token,
mall_token=mall_token
)
# Get response
response = final_state.get("response")
if response is None:
response = "抱歉,我暂时无法处理您的请求。请稍后重试或联系人工客服。"
# Log the response content for debugging
logger.info(
"Preparing to send response to Chatwoot",
conversation_id=conversation_id,
response_length=len(response) if response else 0,
response_preview=response[:200] if response else None,
has_response=bool(response)
)
# Create Chatwoot client已在前面创建这里不需要再次创建
# chatwoot 已在 try 块之前创建
# Send response to Chatwoot (skip if empty - agent may have already sent rich content)
if response:
await chatwoot.send_message(
conversation_id=conversation.id,
content=response
)
logger.info(
"Response sent to Chatwoot successfully",
conversation_id=conversation_id
)
# 关闭 typing status隐藏"正在输入..."
try:
await chatwoot.toggle_typing_status(
conversation_id=conversation.id,
typing_status="off"
)
logger.debug(
"Typing status disabled",
conversation_id=conversation_id
)
except Exception as e:
logger.warning(
"Failed to disable typing status",
conversation_id=conversation_id,
error=str(e)
)
# Handle human handoff
if final_state.get("requires_human"):
await chatwoot.update_conversation_status(
conversation_id=conversation.id,
status=ConversationStatus.OPEN
)
# Add label for routing
await chatwoot.add_labels(
conversation_id=conversation.id,
labels=["needs_human", final_state.get("intent", "unknown")]
)
await chatwoot.close()
# Update cache
await cache.add_message(conversation_id, "user", content)
await cache.add_message(conversation_id, "assistant", response)
# Save context
new_context = final_state.get("context", {})
new_context["last_intent"] = final_state.get("intent")
await cache.set_context(conversation_id, new_context)
logger.info(
"Message processed successfully",
conversation_id=conversation_id,
intent=final_state.get("intent"),
requires_human=final_state.get("requires_human")
)
except Exception as e:
logger.error(
"Message processing failed",
conversation_id=conversation_id,
error=str(e)
)
# 关闭 typing status错误时也要关闭
try:
await chatwoot.toggle_typing_status(
conversation_id=conversation.id,
typing_status="off"
)
except Exception:
pass # 忽略关闭时的错误
# Send error response
await chatwoot.send_message(
conversation_id=conversation.id,
content="抱歉,处理您的消息时遇到了问题。我们的客服团队将尽快为您服务。"
)
# Transfer to human
await chatwoot.update_conversation_status(
conversation_id=conversation.id,
status=ConversationStatus.OPEN
)
async def handle_conversation_created(payload: ChatwootWebhookPayload) -> None:
"""Handle new conversation created
Args:
payload: Webhook payload
"""
conversation = payload.conversation
if not conversation:
return
conversation_id = str(conversation.id)
# Get account_id from payload
account_obj = payload.account
# 从 webhook 中动态获取 account_id
account_id = str(account_obj.get("id")) if account_obj else "1"
logger.info(
"New conversation created",
conversation_id=conversation_id,
account_id=account_id
)
# Initialize conversation context
cache = get_cache_manager()
await cache.connect()
context = {
"created": True,
"inbox_id": conversation.inbox_id
}
# Add contact info to context
contact = payload.contact
if contact:
context["contact_name"] = contact.name
context["contact_email"] = contact.email
if contact.custom_attributes:
context.update(contact.custom_attributes)
await cache.set_context(conversation_id, context)
# 检查是否是邮件渠道
is_email = conversation.channel == "Email" if conversation else False
# 只对非邮件渠道发送欢迎消息
if not is_email:
chatwoot = get_chatwoot_client(account_id=int(account_id))
await chatwoot.send_message(
conversation_id=conversation.id,
content="您好!我是 AI 智能助手,很高兴为您服务。请问有什么可以帮您的?\n\n您可以询问我关于订单、商品、售后等问题,我会尽力为您解答。"
)
else:
logger.info(
"Email channel detected, skipping welcome message",
conversation_id=conversation_id
)
async def handle_conversation_status_changed(payload: ChatwootWebhookPayload) -> None:
"""Handle conversation status change
Args:
payload: Webhook payload
"""
conversation = payload.conversation
if not conversation:
return
conversation_id = str(conversation.id)
new_status = conversation.status
logger.info(
"Conversation status changed",
conversation_id=conversation_id,
status=new_status
)
# If resolved, clean up context
if new_status == "resolved":
cache = get_cache_manager()
await cache.connect()
await cache.delete_context(conversation_id)
await cache.clear_messages(conversation_id)
# ============ Webhook Endpoint ============
@router.post("/chatwoot")
async def chatwoot_webhook(
request: Request,
background_tasks: BackgroundTasks
):
"""Chatwoot webhook endpoint
Receives events from Chatwoot and processes them asynchronously.
"""
# Get raw body for signature verification
body = await request.body()
# 尝试从请求 Cookie 中获取用户 Token
user_token = request.cookies.get("token") # 从 Cookie 读取 token
if user_token:
logger.info("User token found in request cookies", token_prefix=user_token[:20])
else:
logger.debug("No token found in request cookies (this is expected for webhook requests)")
# Verify signature
signature = request.headers.get("X-Chatwoot-Signature", "")
if not verify_webhook_signature(body, signature):
logger.warning("Invalid webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse payload
try:
payload = ChatwootWebhookPayload.model_validate_json(body)
except Exception as e:
logger.error("Failed to parse webhook payload", error=str(e))
raise HTTPException(status_code=400, detail="Invalid payload")
event = payload.event
logger.debug(f"Webhook received: {event}")
# Filter out bot's own messages
if payload.message_type == "outgoing":
return {"status": "ignored", "reason": "outgoing message"}
# Filter private messages
if payload.private:
return {"status": "ignored", "reason": "private message"}
# Route by event type
if event == "message_created":
# Only process incoming messages from contacts
if payload.message_type == "incoming":
background_tasks.add_task(handle_incoming_message, payload, user_token)
elif event == "conversation_created":
background_tasks.add_task(handle_conversation_created, payload)
elif event == "conversation_status_changed":
background_tasks.add_task(handle_conversation_status_changed, payload)
elif event == "conversation_updated":
# Handle other conversation updates if needed
pass
return {"status": "accepted", "event": event}