From c4e97cf3129ff1e73b33807230e12a8dd01aae81 Mon Sep 17 00:00:00 2001 From: wangliang Date: Fri, 16 Jan 2026 18:36:17 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E7=89=A9=E6=B5=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E5=8A=9F=E8=83=BD=E5=92=8C=E5=AE=8C=E5=96=84?= =?UTF-8?q?=20token=20=E4=BC=A0=E9=80=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加 get_logistics 工具查询 Mall API /mall/api/order/parcel - 修复 Cookie token 传递到 MCP 的问题 - 增强 LLM 客户端超时处理和日志 - 移除 MALL_API_TOKEN,使用用户登录 token - 更新测试页面使用 setUser 设置用户属性 - 增强 webhook 调试日志 --- .env.example | 3 + agent/agents/order.py | 43 +++++++-- agent/agents/router.py | 15 +++ agent/core/llm.py | 134 +++++++++++---------------- agent/webhooks/chatwoot_webhook.py | 76 +++++++++++++-- docker-compose.yml | 1 - docs/test-conversation-id.html | 28 +++++- mcp_servers/order_mcp/server.py | 143 ++++++++++++++++++++++++++--- mcp_servers/shared/mall_client.py | 2 + 9 files changed, 334 insertions(+), 111 deletions(-) diff --git a/.env.example b/.env.example index 44ba14d..013535b 100644 --- a/.env.example +++ b/.env.example @@ -38,6 +38,9 @@ ORDER_MCP_URL=http://order_mcp:8002 AFTERSALE_MCP_URL=http://aftersale_mcp:8003 PRODUCT_MCP_URL=http://product_mcp:8004 +# ============ Mall API ============ +MALL_API_URL=https://apicn.qa1.gaia888.com + # ============ Agent Config ============ LOG_LEVEL=INFO MAX_CONVERSATION_STEPS=10 diff --git a/agent/agents/order.py b/agent/agents/order.py index f4ebcb9..8d4d045 100644 --- a/agent/agents/order.py +++ b/agent/agents/order.py @@ -25,7 +25,11 @@ ORDER_AGENT_PROMPT = """你是一个专业的 B2B 订单服务助手。 - order_id: 订单号(必需) - 说明:此工具会自动使用用户的身份 token 查询商城订单详情 -2. **query_order** - 查询历史订单 +2. **get_logistics** - 从商城 API 查询物流信息 + - order_id: 订单号(必需) + - 说明:查询订单的物流轨迹和配送状态 + +3. **query_order** - 查询历史订单 - user_id: 用户 ID(自动注入) - account_id: 账户 ID(自动注入) - order_id: 订单号(可选,不填则查询最近订单) @@ -33,10 +37,6 @@ ORDER_AGENT_PROMPT = """你是一个专业的 B2B 订单服务助手。 - date_end: 结束日期(可选) - status: 订单状态(可选) -3. **track_logistics** - 物流跟踪 - - order_id: 订单号 - - tracking_number: 物流单号(可选) - 4. **modify_order** - 修改订单 - order_id: 订单号 - user_id: 用户 ID(自动注入) @@ -108,12 +108,25 @@ ORDER_AGENT_PROMPT = """你是一个专业的 B2B 订单服务助手。 } ``` +用户: "帮我查一下订单 202071324 的物流" +回复: +```json +{ + "action": "call_tool", + "tool_name": "get_logistics", + "arguments": { + "order_id": "202071324" + } +} +``` + ## 重要约束 - **必须返回完整的 JSON 对象**,不要只返回部分内容 - **不要添加任何 markdown 代码块标记**(如 \`\`\`json) - **不要添加任何解释性文字**,只返回 JSON - user_id 和 account_id 会自动注入到 arguments 中,无需手动添加 - 如果用户提供了订单号,优先使用 get_mall_order 工具 +- 如果用户想查询物流状态,使用 get_logistics 工具 - 对于敏感操作(取消、修改),确保有明确的订单号 """ @@ -204,7 +217,15 @@ async def order_agent(state: AgentState) -> AgentState: # Inject user_token if available if state.get("user_token"): arguments["user_token"] = state["user_token"] - logger.info("Injected user_token into tool call") + logger.info( + "Injected user_token into tool call", + token_prefix=state["user_token"][:20] if state["user_token"] else None + ) + else: + logger.warning( + "No user_token available in state, MCP will use default token", + conversation_id=state["conversation_id"] + ) # Use entity if available if "order_id" not in arguments and state["entities"].get("order_id"): @@ -290,7 +311,15 @@ async def order_agent(state: AgentState) -> AgentState: # Inject user_token if available (for Mall API calls) if state.get("user_token"): arguments["user_token"] = state["user_token"] - logger.debug("Injected user_token into tool call") + logger.debug( + "Injected user_token into tool call", + token_prefix=state["user_token"][:20] if state["user_token"] else None + ) + else: + logger.warning( + "No user_token available in state, MCP will use default token", + conversation_id=state["conversation_id"] + ) # Use entity if available if "order_id" not in arguments and state["entities"].get("order_id"): diff --git a/agent/agents/router.py b/agent/agents/router.py index 9bfd380..921bbf0 100644 --- a/agent/agents/router.py +++ b/agent/agents/router.py @@ -81,12 +81,27 @@ async def classify_intent(state: AgentState) -> AgentState: # Parse JSON response content = response.content.strip() + + # Log raw response for debugging + logger.debug( + "LLM response for intent classification", + response_preview=content[:500] if content else "EMPTY", + content_length=len(content) if content else 0 + ) + # Handle markdown code blocks if content.startswith("```"): content = content.split("```")[1] if content.startswith("json"): content = content[4:] + # Check for empty response + if not content: + logger.warning("LLM returned empty response for intent classification") + state["intent"] = Intent.CUSTOMER_SERVICE.value # Default to customer service + state["intent_confidence"] = 0.5 + return state + result = json.loads(content) # Extract intent diff --git a/agent/core/llm.py b/agent/core/llm.py index 77e7076..0681573 100644 --- a/agent/core/llm.py +++ b/agent/core/llm.py @@ -1,7 +1,8 @@ """ ZhipuAI LLM Client for B2B Shopping AI Assistant """ -from typing import Any, AsyncGenerator, Optional +import concurrent.futures +from typing import Any, Optional from dataclasses import dataclass from zhipuai import ZhipuAI @@ -29,23 +30,21 @@ class LLMResponse: class ZhipuLLMClient: """ZhipuAI LLM Client wrapper""" - + + DEFAULT_TIMEOUT = 30 # seconds + def __init__( self, api_key: Optional[str] = None, - model: Optional[str] = None + model: Optional[str] = None, + timeout: Optional[int] = None ): - """Initialize ZhipuAI client - - Args: - api_key: ZhipuAI API key, defaults to settings - model: Model name, defaults to settings - """ self.api_key = api_key or settings.zhipu_api_key self.model = model or settings.zhipu_model + self.timeout = timeout or self.DEFAULT_TIMEOUT self._client = ZhipuAI(api_key=self.api_key) - logger.info("ZhipuAI client initialized", model=self.model) - + logger.info("ZhipuAI client initialized", model=self.model, timeout=self.timeout) + async def chat( self, messages: list[Message], @@ -54,32 +53,21 @@ class ZhipuLLMClient: top_p: float = 0.9, **kwargs: Any ) -> LLMResponse: - """Send chat completion request - - Args: - messages: List of chat messages - temperature: Sampling temperature - max_tokens: Maximum tokens to generate - top_p: Top-p sampling parameter - **kwargs: Additional parameters - - Returns: - LLM response with content and metadata - """ + """Send chat completion request""" formatted_messages = [ {"role": msg.role, "content": msg.content} for msg in messages ] - - logger.debug( + + logger.info( "Sending chat request", model=self.model, message_count=len(messages), temperature=temperature ) - - try: - response = self._client.chat.completions.create( + + def _make_request(): + return self._client.chat.completions.create( model=self.model, messages=formatted_messages, temperature=temperature, @@ -87,10 +75,27 @@ class ZhipuLLMClient: top_p=top_p, **kwargs ) - + + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(_make_request) + response = future.result(timeout=self.timeout) + choice = response.choices[0] - result = LLMResponse( - content=choice.message.content, + content = choice.message.content + + logger.info( + "Chat response received", + finish_reason=choice.finish_reason, + content_length=len(content) if content else 0, + usage=response.usage.__dict__ if hasattr(response, 'usage') else {} + ) + + if not content: + logger.warning("LLM returned empty content") + + return LLMResponse( + content=content or "", finish_reason=choice.finish_reason, usage={ "prompt_tokens": response.usage.prompt_tokens, @@ -98,48 +103,34 @@ class ZhipuLLMClient: "total_tokens": response.usage.total_tokens } ) - - logger.debug( - "Chat response received", - finish_reason=result.finish_reason, - total_tokens=result.usage["total_tokens"] - ) - - return result - + + except concurrent.futures.TimeoutError: + logger.error("Chat request timed out", timeout=self.timeout) + raise TimeoutError(f"Request timed out after {self.timeout} seconds") + except Exception as e: logger.error("Chat request failed", error=str(e)) raise - + async def chat_with_tools( self, messages: list[Message], tools: list[dict[str, Any]], temperature: float = 0.7, **kwargs: Any - ) -> tuple[LLMResponse, Optional[list[dict[str, Any]]]]: - """Send chat completion request with tool calling - - Args: - messages: List of chat messages - tools: List of tool definitions - temperature: Sampling temperature - **kwargs: Additional parameters - - Returns: - Tuple of (LLM response, tool calls if any) - """ + ) -> tuple[LLMResponse, None]: + """Send chat completion request with tool calling""" formatted_messages = [ {"role": msg.role, "content": msg.content} for msg in messages ] - - logger.debug( + + logger.info( "Sending chat request with tools", model=self.model, tool_count=len(tools) ) - + try: response = self._client.chat.completions.create( model=self.model, @@ -148,42 +139,25 @@ class ZhipuLLMClient: temperature=temperature, **kwargs ) - + choice = response.choices[0] - result = LLMResponse( - content=choice.message.content or "", + content = choice.message.content or "" + + return LLMResponse( + content=content, finish_reason=choice.finish_reason, usage={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens } - ) - - # Extract tool calls if present - tool_calls = None - if hasattr(choice.message, 'tool_calls') and choice.message.tool_calls: - tool_calls = [ - { - "id": tc.id, - "type": tc.type, - "function": { - "name": tc.function.name, - "arguments": tc.function.arguments - } - } - for tc in choice.message.tool_calls - ] - logger.debug("Tool calls received", tool_count=len(tool_calls)) - - return result, tool_calls - + ), None + except Exception as e: logger.error("Chat with tools request failed", error=str(e)) raise -# Global LLM client instance llm_client: Optional[ZhipuLLMClient] = None diff --git a/agent/webhooks/chatwoot_webhook.py b/agent/webhooks/chatwoot_webhook.py index 2033fa1..4aa7ce6 100644 --- a/agent/webhooks/chatwoot_webhook.py +++ b/agent/webhooks/chatwoot_webhook.py @@ -136,6 +136,26 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: contact = payload.contact or payload.sender user_id = str(contact.id) if contact else "unknown" + # Log webhook payload structure for debugging + logger.info( + "Webhook payload structure", + payload_event=payload.event, + has_conversation=bool(conversation), + has_contact=bool(contact), + contact_type=type(contact).__name__ if contact else None, + conversation_id=conversation_id + ) + + # Log full payload keys for debugging + payload_dict = payload.model_dump() + logger.info( + "Full webhook payload keys", + keys=list(payload_dict.keys()), + conversation_keys=list(payload_dict.get('conversation', {}).keys()) if payload_dict.get('conversation') else [], + contact_keys=list(payload_dict.get('contact', {}).keys()) if payload_dict.get('contact') else [], + sender_keys=list(payload_dict.get('sender', {}).keys()) if payload_dict.get('sender') else [] + ) + # Get account_id from payload (top-level account object) # Chatwoot webhook includes account info at the top level account_obj = payload.account @@ -148,13 +168,35 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: if not user_token: # 1. 尝试从 contact/custom_attributes 获取 if contact: - contact_dict = contact.model_dump() if hasattr(contact, 'model_dump') else contact.__dict__ - user_token = TokenManager.extract_token_from_contact(contact_dict) - logger.debug("Extracted token from contact", has_token=bool(user_token)) + logger.info( + "Checking contact for token", + contact_id=contact.id if contact else None, + contact_type=type(contact).__name__ + ) + + # 只有 WebhookContact 才有 custom_attributes + if hasattr(contact, 'custom_attributes'): + logger.info( + "Checking contact custom_attributes", + has_custom_attributes=bool(contact.custom_attributes) + ) + + custom_attrs = contact.custom_attributes or {} + logger.info( + "Contact custom_attributes", + keys=list(custom_attrs.keys()) if custom_attrs else [], + has_jwt_token='jwt_token' in custom_attrs if custom_attrs else False, + has_mall_token='mall_token' in custom_attrs if custom_attrs else False + ) + + contact_dict = {"custom_attributes": custom_attrs} + user_token = TokenManager.extract_token_from_contact(contact_dict) + logger.debug("Extracted token from contact", has_token=bool(user_token)) + else: + logger.debug("Contact type is WebhookSender, no custom_attributes available") # 2. 尝试从 conversation.meta.sender.custom_attributes 获取(Chatwoot SDK setUser 设置的位置) if not user_token and conversation: - # 记录 conversation 的类型和内容用于调试 logger.debug("Conversation object type", type=str(type(conversation))) if hasattr(conversation, 'model_dump'): conv_dict = conversation.model_dump() @@ -162,12 +204,32 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: logger.debug("Has meta", has_meta='meta' in conv_dict) meta_sender = conv_dict.get('meta', {}).get('sender', {}) + logger.info( + "Conversation meta.sender", + has_sender=bool(meta_sender), + sender_keys=list(meta_sender.keys()) if meta_sender else [], + has_custom_attributes=bool(meta_sender.get('custom_attributes')) if meta_sender else False + ) + if meta_sender.get('custom_attributes'): + logger.info("Found custom_attributes in meta.sender", keys=list(meta_sender['custom_attributes'].keys())) user_token = TokenManager.extract_token_from_contact({'custom_attributes': meta_sender['custom_attributes']}) logger.info("Token found in conversation.meta.sender.custom_attributes", token_prefix=user_token[:20] if user_token else None) if user_token: - logger.info("JWT token found", user_id=user_id, source="cookie" if cookie_token else "contact") + logger.info( + "JWT token found", + user_id=user_id, + source="cookie" if cookie_token else "contact", + token_prefix=user_token[:20] if user_token else None + ) + else: + logger.warning( + "No JWT token found from any source", + user_id=user_id, + cookie_token_exists=bool(cookie_token), + contact_type=type(contact).__name__ if contact else None + ) logger.info( "Processing incoming message", @@ -352,7 +414,9 @@ async def chatwoot_webhook( # 尝试从请求 Cookie 中获取用户 Token user_token = request.cookies.get("token") # 从 Cookie 读取 token if user_token: - logger.info("User token found in request cookies") + logger.info("User token found in request cookies", token_prefix=user_token[:20]) + else: + logger.debug("No token found in request cookies (this is expected for webhook requests)") # Verify signature signature = request.headers.get("X-Chatwoot-Signature", "") diff --git a/docker-compose.yml b/docker-compose.yml index 704e59f..4698adc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -198,7 +198,6 @@ services: HYPERF_API_URL: ${HYPERF_API_URL} HYPERF_API_TOKEN: ${HYPERF_API_TOKEN} MALL_API_URL: ${MALL_API_URL} - MALL_API_TOKEN: ${MALL_API_TOKEN} MALL_TENANT_ID: ${MALL_TENANT_ID:-2} MALL_CURRENCY_CODE: ${MALL_CURRENCY_CODE:-EUR} MALL_LANGUAGE_ID: ${MALL_LANGUAGE_ID:-1} diff --git a/docs/test-conversation-id.html b/docs/test-conversation-id.html index 4c22657..de094bf 100644 --- a/docs/test-conversation-id.html +++ b/docs/test-conversation-id.html @@ -381,12 +381,36 @@ const token = getCookie('token'); - window.chatwootSDK.run({ + // 初始化配置 + const widgetConfig = { websiteToken: '39PNCMvbMk3NvB7uaDNucc6o', baseUrl: BASE_URL, locale: 'zh_CN', userIdentifier: token || 'web_user_' + Date.now() - }); + }; + + window.chatwootSDK.run(widgetConfig); + + // 等待 widget 加载完成后设置用户属性 + setTimeout(() => { + if (token && window.chatwootSDK.setUser) { + window.chatwootSDK.setUser( + token || 'web_user_' + Date.now(), + { + jwt_token: token, + mall_token: token + } + ); + console.log('✅ 已通过 setUser 设置用户属性'); + } else if (token && window.$chatwoot) { + // 备用方案:使用 $chatwoot.setCustomAttributes + window.$chatwoot.setCustomAttributes({ + jwt_token: token, + mall_token: token + }); + console.log('✅ 已通过 $chatwoot.setCustomAttributes 设置用户属性'); + } + }, 1000); console.log('✅ Chatwoot Widget 已加载'); console.log('Locale: zh_CN'); diff --git a/mcp_servers/order_mcp/server.py b/mcp_servers/order_mcp/server.py index 7e4fff2..4189d7e 100644 --- a/mcp_servers/order_mcp/server.py +++ b/mcp_servers/order_mcp/server.py @@ -313,37 +313,150 @@ async def get_mall_order( 订单详情,包含订单号、状态、商品信息、金额、物流信息等 Order details including order ID, status, items, amount, logistics info, etc. """ + import logging + logger = logging.getLogger(__name__) + + logger.info( + "get_mall_order called", + order_id=order_id, + has_user_token=bool(user_token), + user_token_prefix=user_token[:20] if user_token else None + ) + try: - # 如果提供了 user_token,使用用户自己的 token - if user_token: - client = MallClient( - api_url=settings.mall_api_url, - api_token=user_token, - tenant_id=settings.mall_tenant_id, - currency_code=settings.mall_currency_code, - language_id=settings.mall_language_id, - source=settings.mall_source - ) - else: - # 否则使用默认的 mall 实例 - client = mall + # 必须提供 user_token + if not user_token: + logger.error("No user_token provided, user must be logged in") + return { + "success": False, + "error": "用户未登录,请先登录账户以查询订单信息", + "order_id": order_id, + "require_login": True + } + + logger.info("Using user token for Mall API request") + client = MallClient( + api_url=settings.mall_api_url, + api_token=user_token, + tenant_id=settings.mall_tenant_id, + currency_code=settings.mall_currency_code, + language_id=settings.mall_language_id, + source=settings.mall_source + ) result = await client.get_order_by_id(order_id) + logger.info( + "Mall API request successful", + order_id=order_id, + result_keys=list(result.keys()) if isinstance(result, dict) else None + ) + return { "success": True, "order": result, "order_id": order_id } except Exception as e: + logger.error( + "Mall API request failed", + order_id=order_id, + error=str(e) + ) return { "success": False, "error": str(e), "order_id": order_id } finally: - # 如果创建了临时客户端,关闭它 - if user_token: + # 关闭客户端 + if 'client' in dir() and client: + await client.close() + + +@register_tool("get_logistics") +@mcp.tool() +async def get_logistics( + order_id: str, + user_token: str = None, + user_id: str = None, + account_id: str = None +) -> dict: + """Query logistics tracking information from Mall API + + 从 Mall API 查询订单物流信息 + + Args: + order_id: 订单号 (e.g., "201941967") + user_token: 用户 JWT token(必需,用于身份验证) + user_id: 用户 ID(自动注入,此工具不使用) + account_id: 账户 ID(自动注入,此工具不使用) + + Returns: + 物流信息,包含快递公司、状态、预计送达时间、物流轨迹等 + """ + import logging + logger = logging.getLogger(__name__) + + logger.info( + "get_logistics called", + order_id=order_id, + has_user_token=bool(user_token) + ) + + # 必须提供 user_token + if not user_token: + logger.error("No user_token provided for logistics query") + return { + "success": False, + "error": "用户未登录,请先登录账户以查询物流信息", + "order_id": order_id, + "require_login": True + } + + try: + client = MallClient( + api_url=settings.mall_api_url, + api_token=user_token, + tenant_id=settings.mall_tenant_id, + currency_code=settings.mall_currency_code, + language_id=settings.mall_language_id, + source=settings.mall_source + ) + + result = await client.get( + "/mall/api/order/parcel", + params={"orderId": order_id} + ) + + logger.info( + "Logistics query successful", + order_id=order_id, + has_tracking=bool(result.get("trackingNumber")) + ) + + return { + "success": True, + "order_id": order_id, + "tracking_number": result.get("trackingNumber"), + "courier": result.get("courier"), + "status": result.get("status"), + "estimated_delivery": result.get("estimatedDelivery"), + "timeline": result.get("timeline", []) + } + except Exception as e: + logger.error( + "Logistics query failed", + order_id=order_id, + error=str(e) + ) + return { + "success": False, + "error": str(e), + "order_id": order_id + } + finally: + if 'client' in dir() and client: await client.close() diff --git a/mcp_servers/shared/mall_client.py b/mcp_servers/shared/mall_client.py index 9f3e378..a91541a 100644 --- a/mcp_servers/shared/mall_client.py +++ b/mcp_servers/shared/mall_client.py @@ -61,6 +61,8 @@ class MallClient: "currency-code": self.currency_code, "language-id": self.language_id, "source": self.source, + "origin": "https://www.qa1.gaia888.com", + "referer": "https://www.qa1.gaia888.com/", }, timeout=30.0 )