From 0b5d0a8086aae5eb5504533a84acbc0034fe2a6e Mon Sep 17 00:00:00 2001 From: wangliang Date: Fri, 23 Jan 2026 18:49:40 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=E8=AE=A2=E5=8D=95?= =?UTF-8?q?=E5=92=8C=E7=89=A9=E6=B5=81=E4=BF=A1=E6=81=AF=E5=B1=95=E7=A4=BA?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要改动: - 订单列表:使用 order_list 格式,展示 5 个订单(全部状态) - 订单详情:使用 order_detail 格式,优化价格和时间显示 - 物流信息:使用 logistics 格式,根据 track id 动态生成步骤 - 商品图片:从 orderProduct.imageUrl 字段获取 - 时间格式:统一为 YYYY-MM-DD HH:MM:SS - 多语言支持:amountLabel、orderTime 支持中英文 - 配置管理:新增 FRONTEND_URL 环境变量 - API 集成:改进 Mall API tracks 数据解析 - 认证优化:account_id 从 webhook 动态获取 Co-Authored-By: Claude Sonnet 4.5 --- agent/agents/customer_service.py | 97 +++- agent/agents/order.py | 378 ++++++++++---- agent/config.py | 3 + agent/core/graph.py | 7 +- agent/core/state.py | 6 +- agent/integrations/chatwoot.py | 777 ++++++++++++++++++++++++----- agent/utils/token_manager.py | 33 ++ agent/webhooks/chatwoot_webhook.py | 162 ++++-- docker-compose.yml | 117 +---- mcp_servers/order_mcp/server.py | 173 ++++++- mcp_servers/shared/mall_client.py | 134 ++++- 11 files changed, 1493 insertions(+), 394 deletions(-) diff --git a/agent/agents/customer_service.py b/agent/agents/customer_service.py index 122afa1..96d9651 100644 --- a/agent/agents/customer_service.py +++ b/agent/agents/customer_service.py @@ -69,13 +69,98 @@ async def customer_service_agent(state: AgentState) -> AgentState: # Auto-detect category and query FAQ message_lower = state["current_message"].lower() - # 定义分类关键词 + # 定义分类关键词(支持多语言:en, nl, de, es, fr, it, tr, zh) category_keywords = { - "register": ["register", "sign up", "account", "login", "password", "forgot"], - "order": ["order", "place order", "cancel order", "modify order", "change order"], - "payment": ["pay", "payment", "checkout", "voucher", "discount", "promo"], - "shipment": ["ship", "shipping", "delivery", "courier", "transit", "logistics", "tracking"], - "return": ["return", "refund", "exchange", "defective", "damaged"], + "register": [ + # English + "register", "sign up", "account", "login", "password", "forgot", + # Dutch (Nederlands) + "registreren", "account", "inloggen", "wachtwoord", + # German (Deutsch) + "registrieren", "konto", "anmelden", "passwort", + # Spanish (Español) + "registrar", "cuenta", "iniciar", "contraseña", + # French (Français) + "enregistrer", "compte", "connecter", "mot de passe", + # Italian (Italiano) + "registrarsi", "account", "accesso", "password", + # Turkish (Türkçe) + "kayıt", "hesap", "giriş", "şifre", + # Chinese (中文) + "注册", "账号", "登录", "密码", "忘记密码" + ], + "order": [ + # English + "order", "place order", "cancel order", "modify order", "change order", + # Dutch + "bestelling", "bestellen", "annuleren", "wijzigen", + # German + "bestellung", "bestellen", "stornieren", "ändern", + # Spanish + "pedido", "hacer pedido", "cancelar", "modificar", + # French + "commande", "passer commande", "annuler", "modifier", + # Italian + "ordine", "ordinare", "cancellare", "modificare", + # Turkish + "sipariş", "sipariş ver", "iptal", "değiştir", + # Chinese + "订单", "下单", "取消订单", "修改订单", "更改订单" + ], + "payment": [ + # English + "pay", "payment", "checkout", "voucher", "discount", "promo", + # Dutch + "betalen", "betaling", "korting", "voucher", + # German + "bezahlen", "zahlung", "rabatt", "gutschein", + # Spanish + "pagar", "pago", "descuento", "cupón", + # French + "payer", "paiement", "réduction", "bon", + # Italian + "pagare", "pagamento", "sconto", "voucher", + # Turkish + "ödemek", "ödeme", "indirim", "kupon", + # Chinese + "支付", "付款", "结算", "优惠券", "折扣", "促销" + ], + "shipment": [ + # English + "ship", "shipping", "delivery", "courier", "transit", "logistics", "tracking", + # Dutch + "verzenden", "levering", "koerier", "logistiek", "volgen", + # German + "versand", "lieferung", "kurier", "logistik", "verfolgung", + # Spanish + "enviar", "envío", "entrega", "mensajería", "logística", "seguimiento", + # French + "expédier", "livraison", "coursier", "logistique", "suivi", + # Italian + "spedire", "spedizione", "consegna", "corriere", "logistica", "tracciamento", + # Turkish + "gönderi", "teslimat", "kurye", "lojistik", "takip", + # Chinese + "发货", "配送", "快递", "物流", "运输", "配送单" + ], + "return": [ + # English + "return", "refund", "exchange", "defective", "damaged", + # Dutch + "retour", "terugbetaling", "ruilen", "defect", + # German + "rückgabe", "erstattung", "austausch", "defekt", + # Spanish + "devolución", "reembolso", "cambio", "defectuoso", + # French + "retour", "remboursement", "échange", "défectueux", + # Italian + "reso", "rimborso", "cambio", "difettoso", + # Turkish + "iade", "geri ödeme", "değişim", "defekt", + # Chinese + "退货", "退款", "换货", "有缺陷", "损坏" + ], } # 检测分类 diff --git a/agent/agents/order.py b/agent/agents/order.py index bef74a2..4d4cdfd 100644 --- a/agent/agents/order.py +++ b/agent/agents/order.py @@ -12,6 +12,40 @@ from integrations.chatwoot import ChatwootClient logger = get_logger(__name__) +# Order status constants (Mall API) +ORDER_STATUS = { + 0: "已取消", # ORDER_STATUS_CANCEL + 1: "待支付", # ORDER_STATUS_WAIT_PAY + 2: "已支付", # ORDER_STATUS_PAID + 3: "已发货", # ORDER_STATUS_SHIPPED + 4: "已签收", # ORDER_STATUS_SIGNED + 15: "已完成", # ORDER_STATUS_FINISH + 100: "超时取消", # ORDER_STATUS_CANCEL_OVER_TIME + 110: "已废弃", # ORDER_STATUS_PAY_SUCCESS (deprecated) + 10000: "全部" # All +} + + +def get_status_text(status_value: Any) -> str: + """获取订单状态文本 + + Args: + status_value: 状态值(数字或字符串) + + Returns: + 状态文本 + """ + if status_value is None: + return "" + + # 转换为整数 + try: + status_int = int(status_value) + return ORDER_STATUS.get(status_int, str(status_value)) + except (ValueError, TypeError): + return str(status_value) + + ORDER_AGENT_PROMPT = """你是一个专业的 B2B 订单服务助手。 你的职责是帮助用户处理订单相关的问题,包括: - 订单查询 @@ -22,21 +56,21 @@ ORDER_AGENT_PROMPT = """你是一个专业的 B2B 订单服务助手。 ## 可用工具 -1. **get_mall_order** - 从商城 API 查询订单(推荐使用) +1. **get_mall_order** - 从商城 API 查询单个订单详情(推荐使用) - order_id: 订单号(必需) + - user_token: 用户 token(自动注入) - 说明:此工具会自动使用用户的身份 token 查询商城订单详情 -2. **get_logistics** - 从商城 API 查询物流信息 - - order_id: 订单号(必需) - - 说明:查询订单的物流轨迹和配送状态 +2. **get_mall_order_list** - 从商城 API 查询订单列表(推荐使用) + - user_token: 用户 token(自动注入) + - page: 页码(可选,默认 1) + - limit: 每页数量(可选,默认 10) + - 说明:查询用户的所有订单,按时间倒序排列 -3. **query_order** - 查询历史订单 - - user_id: 用户 ID(自动注入) - - account_id: 账户 ID(自动注入) - - order_id: 订单号(可选,不填则查询最近订单) - - date_start: 开始日期(可选) - - date_end: 结束日期(可选) - - status: 订单状态(可选) +3. **get_logistics** - 从商城 API 查询物流信息 + - order_id: 订单号(必需) + - user_token: 用户 token(自动注入) + - 说明:查询订单的物流轨迹和配送状态 4. **modify_order** - 修改订单 - order_id: 订单号 @@ -100,6 +134,16 @@ ORDER_AGENT_PROMPT = """你是一个专业的 B2B 订单服务助手。 } ``` +用户: "查询我最近的订单" 或 "我的订单列表" +回复: +```json +{ + "action": "call_tool", + "tool_name": "get_mall_order_list", + "arguments": {} +} +``` + 用户: "我的订单发货了吗?" 回复: ```json @@ -125,8 +169,10 @@ ORDER_AGENT_PROMPT = """你是一个专业的 B2B 订单服务助手。 - **必须返回完整的 JSON 对象**,不要只返回部分内容 - **不要添加任何 markdown 代码块标记**(如 \`\`\`json) - **不要添加任何解释性文字**,只返回 JSON +- **每次调用工具必须指定 tool_name** - user_id 和 account_id 会自动注入到 arguments 中,无需手动添加 -- 如果用户提供了订单号,优先使用 get_mall_order 工具 +- 如果用户提供了具体订单号,使用 get_mall_order 工具 +- 如果用户想查询订单列表或最近的订单,使用 get_mall_order_list 工具 - 如果用户想查询物流状态,使用 get_logistics 工具 - 对于敏感操作(取消、修改),确保有明确的订单号 """ @@ -216,15 +262,18 @@ async def order_agent(state: AgentState) -> AgentState: arguments["account_id"] = state["account_id"] # Inject user_token if available - if state.get("user_token"): - arguments["user_token"] = state["user_token"] + # 优先使用 mall_token(用于 Mall API),如果没有则使用 user_token + token_to_use = state.get("mall_token") or state.get("user_token") + if token_to_use: + arguments["user_token"] = token_to_use logger.info( - "Injected user_token into tool call", - token_prefix=state["user_token"][:20] if state["user_token"] else None + "Injected token into tool call", + token_type="mall_token" if state.get("mall_token") else "user_token", + token_prefix=token_to_use[:20] if token_to_use else None ) else: logger.warning( - "No user_token available in state, MCP will use default token", + "No token available in state, MCP will use default token", conversation_id=state["conversation_id"] ) @@ -310,21 +359,27 @@ async def order_agent(state: AgentState) -> AgentState: arguments["account_id"] = state["account_id"] # Inject user_token if available (for Mall API calls) - if state.get("user_token"): - arguments["user_token"] = state["user_token"] + # 优先使用 mall_token(用于 Mall API),如果没有则使用 user_token + token_to_use = state.get("mall_token") or state.get("user_token") + if token_to_use: + arguments["user_token"] = token_to_use logger.debug( - "Injected user_token into tool call", - token_prefix=state["user_token"][:20] if state["user_token"] else None + "Injected token into tool call", + token_type="mall_token" if state.get("mall_token") else "user_token", + token_prefix=token_to_use[:20] if token_to_use else None ) else: logger.warning( - "No user_token available in state, MCP will use default token", + "No token available in state, MCP will use default token", conversation_id=state["conversation_id"] ) - # Use entity if available + # Use entity if available (only for single-order queries, not for order list) + tool_name = result["tool_name"] if "order_id" not in arguments and state["entities"].get("order_id"): - arguments["order_id"] = state["entities"]["order_id"] + # 只在查询单个订单的工具中添加 order_id + if tool_name in ["get_mall_order", "get_logistics", "query_order"]: + arguments["order_id"] = state["entities"]["order_id"] state = add_tool_call( state, @@ -363,8 +418,23 @@ async def order_agent(state: AgentState) -> AgentState: async def _generate_order_response(state: AgentState) -> AgentState: """Generate response based on order tool results""" + # 检查是否是邮件渠道 + is_email = state.get("context", {}).get("is_email", False) + + # 邮件渠道:使用纯文本回复(不支持富媒体) + if is_email: + logger.info( + "Email channel detected, using text response instead of rich media", + conversation_id=state.get("conversation_id") + ) + return await _generate_text_response(state) + + # 获取检测到的语言,默认为英文 + detected_language = state.get("detected_language", "en") + # 解析订单数据并尝试使用 form 格式发送 order_data = None + order_list = [] # 订单列表 user_message = "" logistics_data = None @@ -380,8 +450,18 @@ async def _generate_order_response(state: AgentState) -> AgentState: elif data.get("orders") and len(data["orders"]) > 0: state = update_context(state, {"order_id": data["orders"][0].get("order_id")}) - # 处理 get_mall_order 返回的订单数据 - if tool_name == "get_mall_order" and isinstance(data, dict): + # 处理 get_mall_order_list 返回的订单列表 + if tool_name == "get_mall_order_list" and isinstance(data, dict): + mcp_result = data.get("result", {}) + if mcp_result.get("orders") and isinstance(mcp_result["orders"], list): + # 解析订单列表 + for order_item in mcp_result["orders"]: + parsed_order = _parse_mall_order_data(order_item) + if parsed_order.get("order_id"): + order_list.append(parsed_order) + + # 处理 get_mall_order 返回的单个订单数据 + elif tool_name == "get_mall_order" and isinstance(data, dict): # MCP 返回结构: {"success": true, "result": {...}} # result 可能包含: {"success": bool, "order": {...}, "order_id": "...", "error": "..."} mcp_result = data.get("result", {}) @@ -412,9 +492,16 @@ async def _generate_order_response(state: AgentState) -> AgentState: # 处理 query_order 返回的订单数据 elif tool_name == "query_order" and isinstance(data, dict): if data.get("orders") and len(data["orders"]) > 0: - order_data = _parse_order_data(data["orders"][0]) + # 如果有多个订单,添加到列表 if len(data["orders"]) > 1: - user_message = f"找到 {len(data['orders'])} 个订单,显示最新的一个:" + for order_item in data["orders"]: + parsed = _parse_order_data(order_item) + if parsed.get("order_id"): + order_list.append(parsed) + user_message = f"找到 {len(data['orders'])} 个订单" + else: + # 只有一个订单,作为单个订单处理 + order_data = _parse_order_data(data["orders"][0]) # 处理 get_logistics 返回的物流数据 elif tool_name == "get_logistics" and isinstance(data, dict): @@ -422,8 +509,88 @@ async def _generate_order_response(state: AgentState) -> AgentState: # 如果之前有订单数据,添加物流信息 if order_data: order_data["logistics"] = logistics_data + # 如果只有物流数据,单独发送物流信息 + elif logistics_data and logistics_data.get("tracking_number"): + # 添加订单号(如果有) + order_id = state.get("order_id", logistics_data.get("order_id", "")) + if order_id: + logistics_data["order_id"] = order_id - # 尝试使用 Chatwoot cards 格式发送 + try: + chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1))) + conversation_id = state.get("conversation_id") + + if conversation_id: + logger.info( + "Preparing to send logistics info", + conversation_id=conversation_id, + tracking_number=logistics_data.get("tracking_number"), + carrier=logistics_data.get("carrier"), + language=detected_language + ) + + await chatwoot.send_logistics_info( + conversation_id=int(conversation_id), + logistics_data=logistics_data, + language=detected_language + ) + + logger.info( + "Logistics info sent successfully", + conversation_id=conversation_id, + tracking_number=logistics_data.get("tracking_number") + ) + + state = set_response(state, "") + state["state"] = ConversationState.GENERATING.value + return state + + except Exception as e: + logger.error( + "Failed to send logistics info, falling back to text response", + error=str(e), + tracking_number=logistics_data.get("tracking_number") + ) + + # 如果有订单列表(多个订单),使用订单列表格式 + if order_list and len(order_list) > 1: + try: + chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1))) + conversation_id = state.get("conversation_id") + + if conversation_id: + logger.info( + "Preparing to send order list", + conversation_id=conversation_id, + orders_count=len(order_list), + language=detected_language + ) + + await chatwoot.send_order_list( + conversation_id=int(conversation_id), + orders=order_list, + language=detected_language + ) + + logger.info( + "Order list sent successfully", + conversation_id=conversation_id, + orders_count=len(order_list), + language=detected_language + ) + + state = set_response(state, "") + state["state"] = ConversationState.GENERATING.value + return state + + except Exception as e: + logger.error( + "Failed to send order list, falling back to text response", + error=str(e), + orders_count=len(order_list) + ) + + # 尝试使用 Chatwoot form 格式发送单个订单 if order_data: try: # 检查是否有有效的 order_id @@ -434,7 +601,7 @@ async def _generate_order_response(state: AgentState) -> AgentState: ) return await _generate_text_response(state) - chatwoot = ChatwootClient() + chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1))) conversation_id = state.get("conversation_id") if conversation_id: @@ -443,24 +610,24 @@ async def _generate_order_response(state: AgentState) -> AgentState: "Preparing to send order card", conversation_id=conversation_id, order_id=order_data.get("order_id"), - items_count=len(order_data.get("items", [])) + items_count=len(order_data.get("items", [])), + language=detected_language ) - # 发送订单卡片(使用默认的"查看订单详情"按钮) - await chatwoot.send_order_card( - conversation_id=conversation_id, - order_data=order_data + await chatwoot.send_order_form( + conversation_id=int(conversation_id), + order_data=order_data, + language=detected_language ) logger.info( "Order card sent successfully", conversation_id=conversation_id, - order_id=order_data.get("order_id") + order_id=order_data.get("order_id"), + language=detected_language ) - # 设置确认消息 - response_text = user_message or "订单详情如下" - state = set_response(state, response_text) + state = set_response(state, "") state["state"] = ConversationState.GENERATING.value return state @@ -508,12 +675,19 @@ def _parse_mall_order_data(data: dict[str, Any]) -> dict[str, Any]: order_data = { "order_id": actual_order_data.get("orderId", actual_order_data.get("order_id", actual_order_data.get("order_sn", ""))), + "order_type": actual_order_data.get("orderType", actual_order_data.get("order_type", "")), "status": actual_order_data.get("orderStatusId", actual_order_data.get("status", "unknown")), - "status_text": actual_order_data.get("statusText", actual_order_data.get("status_text", actual_order_data.get("status", ""))), + "status_text": actual_order_data.get("statusText", ""), "total_amount": actual_order_data.get("total", actual_order_data.get("total_amount", actual_order_data.get("order_amount", "0.00"))), "shipping_fee": actual_order_data.get("shipping_fee", actual_order_data.get("freight_amount", "0")), + "payment_method": actual_order_data.get("paymentCode", actual_order_data.get("paymentMethod", "")), } + # 如果 statusText 为空,使用 orderStatusId 映射获取状态文本 + if not order_data["status_text"]: + status_id = actual_order_data.get("orderStatusId", actual_order_data.get("status")) + order_data["status_text"] = get_status_text(status_id) + # 下单时间 if actual_order_data.get("created_at"): order_data["created_at"] = actual_order_data["created_at"] @@ -522,65 +696,99 @@ def _parse_mall_order_data(data: dict[str, Any]) -> dict[str, Any]: elif actual_order_data.get("dateAdded"): order_data["created_at"] = actual_order_data["dateAdded"] - # 商品列表 - 尝试多种可能的字段名(优先 orderProduct) - items = ( - actual_order_data.get("orderProduct") or - actual_order_data.get("items") or - actual_order_data.get("order_items") or - actual_order_data.get("products") or - actual_order_data.get("orderGoods") or - actual_order_data.get("goods") or - [] - ) + # 商品列表 - 直接使用 Mall API 返回的 orderProduct 字段 + order_product = actual_order_data.get("orderProduct", []) # 记录商品列表数据结构(用于调试) - if items and len(items) > 0: - first_item = items[0] - logger.info( - "First item structure", - first_item_keys=list(first_item.keys()) if isinstance(first_item, dict) else type(first_item).__name__, - has_image_url=bool(first_item.get("image_url")) if isinstance(first_item, dict) else False, - has_image=bool(first_item.get("image")) if isinstance(first_item, dict) else False, - has_pic=bool(first_item.get("pic")) if isinstance(first_item, dict) else False, - sample_item_data=str(first_item)[:500] if isinstance(first_item, dict) else str(first_item) - ) + logger.info( + "Parsing orderProduct array", + has_order_product=bool(order_product), + order_product_count=len(order_product) if isinstance(order_product, list) else 0, + order_product_type=type(order_product).__name__ + ) - if items: + if order_product and isinstance(order_product, list) and len(order_product) > 0: order_data["items"] = [] - for item in items: + for product in order_product: + if not isinstance(product, dict): + continue + item_data = { - "name": item.get("name", item.get("productName", item.get("product_name", "未知商品"))), - "quantity": item.get("quantity", item.get("num", item.get("productNum", 1))), - "price": item.get("price", item.get("total", item.get("productPrice", item.get("product_price", "0.00")))) + "name": product.get("productName", product.get("name", product.get("product_name", "未知商品"))), + "quantity": product.get("quantity", product.get("num", product.get("productNum", 1))), + "price": product.get("price", product.get("productPrice", product.get("product_price", "0.00"))) } - # 添加商品图片(支持多种可能的字段名) - image_url = ( - item.get("image") or - item.get("image_url") or - item.get("pic") or - item.get("thumb") or - item.get("product_image") or - item.get("pic_url") or - item.get("thumb_url") or - item.get("img") or - item.get("productImg") or - item.get("thumb") - ) + + # 商品图片直接从 product 的 imageUrl 字段获取 + image_url = product.get("imageUrl") if image_url: item_data["image_url"] = image_url - else: - # 记录没有图片的商品(用于调试) - logger.debug( - "No image found for product", - product_name=item_data.get("name"), - available_keys=list(item.keys()) - ) + order_data["items"].append(item_data) # 备注 if actual_order_data.get("remark") or actual_order_data.get("user_remark"): order_data["remark"] = actual_order_data.get("remark", actual_order_data.get("user_remark", "")) + # 物流信息(如果有) + if actual_order_data.get("parcels") and len(actual_order_data.get("parcels", [])) > 0: + # parcels 是一个数组,包含物流信息 + first_parcel = actual_order_data["parcels"][0] if isinstance(actual_order_data["parcels"], list) else actual_order_data["parcels"] + if isinstance(first_parcel, dict): + logistics_info = { + "carrier": first_parcel.get("courier", first_parcel.get("carrier", first_parcel.get("company", ""))), + "tracking_number": first_parcel.get("trackingCode", first_parcel.get("tracking_number", first_parcel.get("trackingNumber", ""))), + } + # 只有在有有效数据时才添加 + if logistics_info["carrier"] or logistics_info["tracking_number"]: + order_data["logistics"] = logistics_info + + # 收货地址 + shipping_firstname = actual_order_data.get("shippingFirstname", "") + shipping_lastname = actual_order_data.get("shippingLastname", "") + shipping_company = actual_order_data.get("shippingCompany", "") + shipping_address_1 = actual_order_data.get("shippingAddress_1", "") + shipping_address_2 = actual_order_data.get("shippingAddress_2", "") + shipping_city = actual_order_data.get("shippingCity", "") + shipping_postcode = actual_order_data.get("shippingPostcode", "") + shipping_country = actual_order_data.get("shippingCountry", "") + shipping_zone = actual_order_data.get("shippingZone", "") + + # 如果有任何地址字段存在,构建收货地址 + if any([shipping_firstname, shipping_lastname, shipping_address_1, shipping_city]): + order_data["shipping_address"] = { + "name": f"{shipping_firstname} {shipping_lastname}".strip() or "", + "line1": shipping_address_1 or "", + "line2": shipping_address_2 or "", + "city": shipping_city or "", + "state": shipping_zone or "", + "postal_code": shipping_postcode or "", + "country": shipping_country or "" + } + + # 发票地址 + billing_firstname = actual_order_data.get("paymentFirstname", "") + billing_lastname = actual_order_data.get("paymentLastname", "") + billing_company = actual_order_data.get("paymentCompany", "") + billing_address_1 = actual_order_data.get("paymentAddress_1", "") + billing_address_2 = actual_order_data.get("paymentAddress_2", "") + billing_city = actual_order_data.get("paymentCity", "") + billing_postcode = actual_order_data.get("paymentPostcode", "") + billing_country = actual_order_data.get("paymentCountry", "") + billing_zone = actual_order_data.get("paymentZone", "") + + # 如果有任何地址字段存在,构建发票地址 + if any([billing_firstname, billing_lastname, billing_address_1, billing_city]): + order_data["billing_address"] = { + "name": f"{billing_firstname} {billing_lastname}".strip() or "", + "line1": billing_address_1 or "", + "line2": billing_address_2 or "", + "city": billing_city or "", + "state": billing_zone or "", + "postal_code": billing_postcode or "", + "country": billing_country or "" + } + return order_data diff --git a/agent/config.py b/agent/config.py index 176db65..87540f4 100644 --- a/agent/config.py +++ b/agent/config.py @@ -33,6 +33,9 @@ class Settings(BaseSettings): # ============ Hyperf API ============ hyperf_api_url: str = Field(..., description="Hyperf API URL") hyperf_api_token: str = Field(..., description="Hyperf API Token") + + # ============ Frontend URLs ============ + frontend_url: str = Field(default="https://www.qa1.gaia888.com", description="Frontend URL for order details") # ============ MCP Servers ============ strapi_mcp_url: str = Field(default="http://localhost:8001", description="Strapi MCP URL") diff --git a/agent/core/graph.py b/agent/core/graph.py index 3a2c6eb..0a5e399 100644 --- a/agent/core/graph.py +++ b/agent/core/graph.py @@ -367,7 +367,8 @@ async def process_message( message: str, history: list[dict] = None, context: dict = None, - user_token: str = None + user_token: str = None, + mall_token: str = None ) -> AgentState: """Process a user message through the agent workflow @@ -379,6 +380,7 @@ async def process_message( history: Previous conversation history context: Existing conversation context user_token: User JWT token for API calls + mall_token: Mall API token (if different from user_token) Returns: Final agent state with response @@ -393,7 +395,8 @@ async def process_message( current_message=message, messages=history, context=context, - user_token=user_token + user_token=user_token, + mall_token=mall_token ) # Get compiled graph diff --git a/agent/core/state.py b/agent/core/state.py index f585dd0..38715b9 100644 --- a/agent/core/state.py +++ b/agent/core/state.py @@ -66,6 +66,7 @@ class AgentState(TypedDict): user_id: str # User identifier account_id: str # B2B account identifier user_token: Optional[str] # User JWT token for API calls + mall_token: Optional[str] # Mall API token (if different from user_token) # ============ Message Content ============ messages: list[dict[str, Any]] # Conversation history [{role, content}] @@ -117,7 +118,8 @@ def create_initial_state( current_message: str, messages: Optional[list[dict[str, Any]]] = None, context: Optional[dict[str, Any]] = None, - user_token: Optional[str] = None + user_token: Optional[str] = None, + mall_token: Optional[str] = None ) -> AgentState: """Create initial agent state for a new message @@ -129,6 +131,7 @@ def create_initial_state( messages: Previous conversation history context: Existing conversation context user_token: User JWT token for API calls + mall_token: Mall API token (if different from user_token) Returns: Initialized AgentState @@ -139,6 +142,7 @@ def create_initial_state( user_id=user_id, account_id=account_id, user_token=user_token, + mall_token=mall_token, # Messages messages=messages or [], diff --git a/agent/integrations/chatwoot.py b/agent/integrations/chatwoot.py index 43c703b..5b3d3ae 100644 --- a/agent/integrations/chatwoot.py +++ b/agent/integrations/chatwoot.py @@ -14,6 +14,146 @@ from utils.logger import get_logger logger = get_logger(__name__) +# 订单字段多语言映射 +ORDER_FIELD_LABELS = { + "zh": { # 中文 + "title": "订单详情", + "order_id": "订单号", + "status": "订单状态", + "total_amount": "订单金额", + "payment_method": "支付方式", + "items_count": "商品总数量", + "shipping_method": "运输方式", + "tracking_number": "运单号", + "shipping_address": "收货地址", + "billing_address": "发票地址", + "channel": "渠道", + "created_at": "下单时间", + "action": "操作", + }, + "en": { # English + "title": "Order Details", + "order_id": "Order ID", + "status": "Order Status", + "total_amount": "Total Amount", + "payment_method": "Payment Method", + "items_count": "Total Items", + "shipping_method": "Shipping Method", + "tracking_number": "Tracking Number", + "shipping_address": "Shipping Address", + "billing_address": "Billing Address", + "channel": "Channel", + "created_at": "Order Date", + "action": "Action", + }, + "nl": { # Dutch (荷兰语) + "title": "Orderdetails", + "order_id": "Ordernummer", + "status": "Orderstatus", + "total_amount": "Totaalbedrag", + "payment_method": "Betaalmethode", + "items_count": "Totaal aantal artikelen", + "shipping_method": "Verzendmethode", + "tracking_number": "Trackingsnummer", + "shipping_address": "Verzendadres", + "billing_address": "Factuuradres", + "channel": "Kanaal", + "created_at": "Besteldatum", + "action": "Actie", + }, + "de": { # German (德语) + "title": "Bestelldetails", + "order_id": "Bestellnummer", + "status": "Bestellstatus", + "total_amount": "Gesamtbetrag", + "payment_method": "Zahlungsmethode", + "items_count": "Gesamtanzahl Artikel", + "shipping_method": "Versandart", + "tracking_number": "Sendungsnummer", + "shipping_address": "Lieferadresse", + "billing_address": "Rechnungsadresse", + "channel": "Kanal", + "created_at": "Bestelldatum", + "action": "Aktion", + }, + "es": { # Spanish (西班牙语) + "title": "Detalles del pedido", + "order_id": "Número de pedido", + "status": "Estado del pedido", + "total_amount": "Importe total", + "payment_method": "Método de pago", + "items_count": "Total de artículos", + "shipping_method": "Método de envío", + "tracking_number": "Número de seguimiento", + "shipping_address": "Dirección de envío", + "billing_address": "Dirección de facturación", + "channel": "Canal", + "created_at": "Fecha del pedido", + "action": "Acción", + }, + "fr": { # French (法语) + "title": "Détails de la commande", + "order_id": "Numéro de commande", + "status": "Statut de la commande", + "total_amount": "Montant total", + "payment_method": "Méthode de paiement", + "items_count": "Nombre total d'articles", + "shipping_method": "Méthode d'expédition", + "tracking_number": "Numéro de suivi", + "shipping_address": "Adresse de livraison", + "billing_address": "Adresse de facturation", + "channel": "Canal", + "created_at": "Date de commande", + "action": "Action", + }, + "it": { # Italian (意大利语) + "title": "Detagli dell'ordine", + "order_id": "Numero ordine", + "status": "Stato ordine", + "total_amount": "Importo totale", + "payment_method": "Metodo di pagamento", + "items_count": "Totale articoli", + "shipping_method": "Metodo di spedizione", + "tracking_number": "Numero di tracciamento", + "shipping_address": "Indirizzo di spedizione", + "billing_address": "Indirizzo di fatturazione", + "channel": "Canale", + "created_at": "Data ordine", + "action": "Azione", + }, + "tr": { # Turkish (土耳其语) + "title": "Sipariş Detayları", + "order_id": "Sipariş Numarası", + "status": "Sipariş Durumu", + "total_amount": "Toplam Tutar", + "payment_method": "Ödeme Yöntemi", + "items_count": "Toplam Ürün", + "shipping_method": "Kargo Yöntemi", + "tracking_number": "Takip Numarası", + "shipping_address": "Teslimat Adresi", + "billing_address": "Fatura Adresi", + "channel": "Kanal", + "created_at": "Sipariş Tarihi", + "action": "İşlem", + }, +} + + +def get_field_label(field_key: str, language: str = "en") -> str: + """获取指定语言的字段标签 + + Args: + field_key: 字段键名(如 "order_id", "status" 等) + language: 语言代码(默认 "en") + + Returns: + 对应语言的字段标签 + """ + if language not in ORDER_FIELD_LABELS: + language = "en" # 默认使用英文 + return ORDER_FIELD_LABELS[language].get(field_key, ORDER_FIELD_LABELS["en"].get(field_key, field_key)) + + class MessageType(str, Enum): """Chatwoot message types""" INCOMING = "incoming" @@ -91,7 +231,49 @@ class ChatwootClient: if self._client: await self._client.aclose() self._client = None - + + def _format_datetime(self, datetime_str: str) -> str: + """格式化日期时间为 YYYY-MM-DD HH:MM:SS 格式 + + Args: + datetime_str: 输入的日期时间字符串 + + Returns: + 格式化后的日期时间字符串 (YYYY-MM-DD HH:MM:SS) + """ + if not datetime_str: + return "" + + # 如果已经是正确的格式(YYYY-MM-DD HH:MM:SS),直接返回 + if len(datetime_str) == 19 and datetime_str[10] == ' ' and datetime_str[13] == ':' and datetime_str[16] == ':': + return datetime_str + + # 尝试解析常见的时间格式 + from datetime import datetime + + # 常见格式列表 + formats = [ + "%Y-%m-%d %H:%M:%S", # 2025-01-23 14:30:00 + "%Y-%m-%dT%H:%M:%S", # 2025-01-23T14:30:00 + "%Y-%m-%d %H:%M:%S.%f", # 2025-01-23 14:30:00.123456 + "%Y-%m-%dT%H:%M:%S.%f", # 2025-01-23T14:30:00.123456 + "%Y-%m-%dT%H:%M:%SZ", # 2025-01-23T14:30:00Z + "%Y-%m-%dT%H:%M:%S.%fZ", # 2025-01-23T14:30:00.123456Z + "%Y/%m/%d %H:%M:%S", # 2025/01/23 14:30:00 + "%d-%m-%Y %H:%M:%S", # 23-01-2025 14:30:00 + "%m/%d/%Y %H:%M:%S", # 01/23/2025 14:30:00 + ] + + for fmt in formats: + try: + dt = datetime.strptime(datetime_str, fmt) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + continue + + # 如果无法解析,返回原始字符串 + return datetime_str + # ============ Messages ============ async def send_message( @@ -283,28 +465,22 @@ class ChatwootClient: self, conversation_id: int, order_data: dict[str, Any], + language: str = "en", actions: Optional[list[dict[str, Any]]] = None ) -> dict[str, Any]: - """发送订单详情表单消息(使用 content_type=form) - - 根据 Chatwoot API 文档实现的 form 格式订单详情展示。 - form 类型支持的字段类型:text, text_area, email, select + """发送订单详情(使用 content_type=order_detail) Args: conversation_id: 会话 ID order_data: 订单数据,包含: - order_id: 订单号 - - status: 订单状态 + - status: 订单状态码 - status_text: 状态文本 - - created_at: 下单时间(可选) - - items: 商品列表(可选) + - created_at: 下单时间 - total_amount: 总金额 - - shipping_fee: 运费(可选) - - logistics: 物流信息(可选) - - remark: 备注(可选) - actions: 操作按钮配置列表(可选),每个按钮包含: - - label: 按钮文字(用于 select 选项的显示) - - value: 按钮值(用于 select 选项的值) + - items: 商品列表 + language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en + actions: 操作按钮配置列表(可选,暂未使用) Returns: 发送结果 @@ -312,131 +488,461 @@ class ChatwootClient: Example: >>> order_data = { ... "order_id": "123456789", - ... "status": "shipped", - ... "status_text": "已发货", - ... "created_at": "2023-10-27 14:30", - ... "total_amount": "1058.00", - ... "items": [{"name": "商品A", "quantity": 2, "price": "100.00"}] + ... "status": "completed", + ... "status_text": "已完成", + ... "created_at": "2025-01-23 14:30:00", + ... "total_amount": "179.97", + ... "items": [{"name": "商品A", "image_url": "...", "quantity": 2, "price": "49.99"}] ... } - >>> actions = [ - ... {"label": "查看详情", "value": "VIEW_DETAILS"}, - ... {"label": "联系客服", "value": "CONTACT_SUPPORT"} - ... ] - >>> await chatwoot.send_order_form(123, order_data, actions) + >>> await chatwoot.send_order_form(123, order_data, language="zh") """ - # 构建表单字段 - form_items = [] + order_id = order_data.get("order_id", "") + status = order_data.get("status", "") + status_text = order_data.get("status_text", "") - # 订单号(只读文本) - form_items.append({ - "name": "order_id", - "label": "订单号", - "type": "text", - "placeholder": "订单号", - "default": order_data.get("order_id", "") - }) + # 获取原始时间并格式化为 YYYY-MM-DD HH:MM:SS + raw_created_at = order_data.get("created_at", order_data.get("date_added", "")) + created_at = self._format_datetime(raw_created_at) - # 订单状态(只读文本) - status_text = order_data.get("status_text", order_data.get("status", "unknown")) - form_items.append({ - "name": "status", - "label": "订单状态", - "type": "text", - "placeholder": "订单状态", - "default": status_text - }) + total_amount = order_data.get("total_amount", "0") - # 下单时间(只读文本) - if order_data.get("created_at"): - form_items.append({ - "name": "created_at", - "label": "下单时间", - "type": "text", - "placeholder": "下单时间", - "default": order_data["created_at"] - }) - - # 商品列表(多行文本) - items = order_data.get("items", []) - if items: - items_text = "\n".join([ - f"▫️ {item.get('name', '未知商品')} × {item.get('quantity', 1)} - ¥{item.get('price', '0.00')}" - for item in items - ]) - form_items.append({ - "name": "items", - "label": "商品详情", - "type": "text_area", - "placeholder": "商品列表", - "default": items_text - }) - - # 总金额(只读文本) - form_items.append({ - "name": "total_amount", - "label": "总金额", - "type": "text", - "placeholder": "总金额", - "default": f"¥{order_data.get('total_amount', '0.00')}" - }) - - # 运费(只读文本) - if order_data.get("shipping_fee") is not None: - form_items.append({ - "name": "shipping_fee", - "label": "运费", - "type": "text", - "placeholder": "运费", - "default": f"¥{order_data['shipping_fee']}" - }) - - # 物流信息(多行文本) - logistics = order_data.get("logistics") - if logistics: - logistics_text = ( - f"承运商: {logistics.get('carrier', '未知')}\n" - f"单号: {logistics.get('tracking_number', '未知')}" - ) - form_items.append({ - "name": "logistics", - "label": "物流信息", - "type": "text_area", - "placeholder": "物流信息", - "default": logistics_text - }) - - # 备注(多行文本) - if order_data.get("remark"): - form_items.append({ - "name": "remark", - "label": "备注", - "type": "text_area", - "placeholder": "备注", - "default": order_data["remark"] - }) - - # 操作选项(下拉选择,如果提供了 actions) - if actions: - form_items.append({ - "name": "action_select", - "label": "操作", - "type": "select", - "options": actions - }) - - # 构建 content_attributes - content_attributes = { - "items": form_items + # 根据状态码映射状态和颜色 + status_mapping = { + "0": {"status": "cancelled", "text": "已取消", "color": "text-red-600"}, + "1": {"status": "pending", "text": "待支付", "color": "text-yellow-600"}, + "2": {"status": "paid", "text": "已支付", "color": "text-blue-600"}, + "3": {"status": "shipped", "text": "已发货", "color": "text-purple-600"}, + "4": {"status": "signed", "text": "已签收", "color": "text-green-600"}, + "15": {"status": "completed", "text": "已完成", "color": "text-green-600"}, + "100": {"status": "cancelled", "text": "超时取消", "color": "text-red-600"}, } - # 发送 form 类型消息 - return await self.send_rich_message( + status_info = status_mapping.get(str(status), {"status": "unknown", "text": status_text or "未知", "color": "text-gray-600"}) + + # 构建商品列表 + items = order_data.get("items", []) + formatted_items = [] + for item in items: + # 获取价格并格式化为字符串(包含货币符号) + price_value = item.get("price", "0") + try: + price_num = float(price_value) + price_text = f"€{price_num:.2f}" + except (ValueError, TypeError): + price_text = str(price_value) if price_value else "€0.00" + + formatted_items.append({ + "name": item.get("name", "未知商品"), + "quantity": int(item.get("quantity", 1)), + "price": price_text, + "image": item.get("image_url", "") + }) + + # 计算商品总数量 + total_quantity = sum(item.get("quantity", 1) for item in items) + + # 计算总金额并格式化(多语言) + try: + total_amount_value = float(total_amount) + except (ValueError, TypeError): + total_amount_value = 0.0 + + # 构建总计标签 + if language == "zh": + total_label = f"共 {total_quantity} 件商品" + amount_text = f"总计: €{total_amount_value:.2f}" + order_time_text = f"下单时间: {created_at}" + else: + total_label = f"Total: {total_quantity} items" + amount_text = f"Total: €{total_amount_value:.2f}" + order_time_text = f"Order time: {created_at}" + + # 构建操作按钮 + actions_list = [] + # 获取前端 URL + frontend_url = settings.frontend_url.rstrip('/') + + if language == "zh": + # 中文按钮 + actions_list.append({ + "text": "查看物流", + "style": "default", + "reply": f"查看订单物流信息:{order_id}" + }) + actions_list.append({ + "text": "订单详情", + "style": "primary", + "url": f"{frontend_url}/customer/order/detail?orderId={order_id}", + "target": "_self" + }) + else: + # 英文按钮 + actions_list.append({ + "text": "Logistics info", + "style": "default", + "reply": f"查看订单物流信息:{order_id}" + }) + actions_list.append({ + "text": "Order details", + "style": "primary", + "url": f"{frontend_url}/customer/order/detail?orderId={order_id}", + "target": "_self" + }) + + # 构建 content_attributes(order_detail 格式) + content_attributes = { + "status": status_info["status"], + "statusText": status_info["text"], + "statusColor": status_info["color"], + "orderId": str(order_id), + "orderTime": order_time_text, + "items": formatted_items, + "showTotal": True, + "totalLabel": total_label, + "amountLabel": amount_text, + "actions": actions_list + } + + # 记录发送的数据(用于调试) + logger.info( + "Sending order detail", conversation_id=conversation_id, - content="订单详情", - content_type="form", - content_attributes=content_attributes + order_id=order_id, + items_count=len(formatted_items), + total_quantity=total_quantity, + language=language ) - + + # 发送 order_detail 类型消息 + client = await self._get_client() + + payload = { + "content_type": "order_detail", + "content": "", + "content_attributes": content_attributes + } + + response = await client.post( + f"/conversations/{conversation_id}/messages", + json=payload + ) + response.raise_for_status() + + return response.json() + + async def send_logistics_info( + self, + conversation_id: int, + logistics_data: dict[str, Any], + language: str = "en" + ) -> dict[str, Any]: + """发送物流信息(使用 content_type=logistics) + + Args: + conversation_id: 会话 ID + logistics_data: 物流数据,包含: + - carrier: 物流公司名称 + - tracking_number: 运单号 + - status: 当前状态 + - timeline: 物流轨迹列表 + - order_id: 订单号(可选,用于生成链接) + language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en + + Returns: + 发送结果 + + Example: + >>> logistics_data = { + ... "carrier": "顺丰速运", + ... "tracking_number": "SF154228901", + ... "status": "派送中", + ... "timeline": [...] + ... } + >>> await chatwoot.send_logistics_info(123, logistics_data, language="zh") + """ + carrier = logistics_data.get("carrier", "") + tracking_number = logistics_data.get("tracking_number", "") + status = logistics_data.get("status", "") + timeline = logistics_data.get("timeline", []) + order_id = logistics_data.get("order_id", "") + + # 获取最新物流信息(从 timeline 中提取 remark) + latest_log = "" + latest_time = "" + current_step = 0 + + # Track ID 到步骤的映射 + # id = 1 -> Order Received(已接单) + # id = 10 -> Picked Up(已揽收) + # id = 20 -> In Transit(运输中) + # id = 30 -> Delivered(已送达) + track_id_to_step = { + "1": 0, # Order Received + "10": 1, # Picked Up + "20": 2, # In Transit + "30": 3 # Delivered + } + + if timeline and len(timeline) > 0: + # 获取第一条(最新)物流信息 + first_event = timeline[0] + if isinstance(first_event, dict): + # 提取 remark 字段 + remark = first_event.get("remark", "") + + # 提取时间 + time_str = ( + first_event.get("time") or + first_event.get("date") or + first_event.get("timestamp") or + "" + ) + + # 提取位置 + location = first_event.get("location", "") + + # 提取 track id + track_id = str(first_event.get("id", "")) + + # 构建最新物流描述:如果 remark 为空,则只显示 location;否则显示 location | remark + if location and remark: + latest_log = f"{location} | {remark}" + elif remark: + latest_log = remark + elif location: + latest_log = location + else: + latest_log = "" + + # 格式化时间 + if time_str: + latest_time = self._format_datetime(str(time_str)) + + # 根据 track id 判断当前步骤 + if track_id in track_id_to_step: + current_step = track_id_to_step[track_id] + else: + # 如果无法识别 id,根据时间线长度判断 + current_step = len(timeline) if len(timeline) <= 4 else 2 + + # 构建步骤列表(固定4个步骤) + if language == "zh": + steps = [ + {"label": "已接单"}, + {"label": "已揽收"}, + {"label": "运输中"}, + {"label": "已送达"} + ] + else: + steps = [ + {"label": "Order Received"}, + {"label": "Picked Up"}, + {"label": "In Transit"}, + {"label": "Delivered"} + ] + + # 构建操作按钮 + actions = [] + frontend_url = settings.frontend_url.rstrip('/') + + if order_id: + # 如果有订单号,生成物流追踪链接 + if language == "zh": + actions.append({ + "text": "物流详情", + "style": "primary", + "url": f"{frontend_url}/logistic-tracking/{order_id}" + }) + else: + actions.append({ + "text": "Tracking Details", + "style": "primary", + "url": f"{frontend_url}/logistic-tracking/{order_id}" + }) + + # 构建 content_attributes(logistics 格式) + content_attributes = { + "logisticsName": carrier, + "trackingNumber": tracking_number, + "currentStep": current_step, + "isUrgent": False, + "latestLog": latest_log, + "latestTime": latest_time, + "steps": steps, + "actions": actions + } + + # 记录发送的数据(用于调试) + logger.info( + "Sending logistics info", + conversation_id=conversation_id, + carrier=carrier, + tracking_number=tracking_number, + current_step=current_step, + timeline_count=len(timeline) if timeline else 0, + language=language + ) + + # 发送 logistics 类型消息 + client = await self._get_client() + + payload = { + "content_type": "logistics", + "content": "", + "content_attributes": content_attributes + } + + response = await client.post( + f"/conversations/{conversation_id}/messages", + json=payload + ) + response.raise_for_status() + + return response.json() + + async def send_order_list( + self, + conversation_id: int, + orders: list[dict[str, Any]], + language: str = "en" + ) -> dict[str, Any]: + """发送订单列表(使用自定义 content_type=order_list) + + 每个订单包含: + - orderNumber: 订单号 + - date: 订单日期 + - status: 订单状态 + - items: 商品列表(图片和名称) + - actions: 操作按钮 + + Args: + conversation_id: 会话 ID + orders: 订单列表,每个订单包含: + - order_id: 订单号 + - status_text: 订单状态 + - created_at: 下单时间 + - items: 商品列表(可选,包含 name 和 image_url) + - order_type: 订单类型(可选,用于判断渠道) + language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en + + Returns: + 发送结果 + + Example: + >>> orders = [ + ... { + ... "order_id": "20250122001", + ... "created_at": "Jan 22, 2025", + ... "status_text": "Shipped", + ... "items": [ + ... {"image_url": "url1", "name": "Product 1"}, + ... {"image_url": "url2", "name": "Product 2"} + ... ] + ... } + ... ] + >>> await chatwoot.send_order_list(123, orders, language="en") + """ + # 获取多语言按钮文本 + if language == "zh": + details_text = "订单详情" + logistics_text = "物流信息" + details_reply_prefix = "查看订单详情:" + logistics_reply_prefix = "查看订单物流信息:" + else: + details_text = "Order details" + logistics_text = "Logistics info" + details_reply_prefix = "查看订单详情:" + logistics_reply_prefix = "查看订单物流信息:" + + # 构建订单列表 + order_list_data = [] + + for order in orders: + order_id = order.get("order_id", "") + status_text = order.get("status_text", "") + + # 获取原始时间并格式化为 YYYY-MM-DD HH:MM:SS + raw_created_at = order.get("created_at", order.get("date_added", "")) + created_at = self._format_datetime(raw_created_at) + + # 构建商品列表 + items = order.get("items", []) + formatted_items = [] + + logger.info( + f"Processing order {order_id} for items", + items_count=len(items) if isinstance(items, list) else 0, + items_type=type(items).__name__, + first_item_keys=list(items[0].keys()) if items and len(items) > 0 else None + ) + + if items and isinstance(items, list): + for item in items: + formatted_items.append({ + "image": item.get("image_url", ""), + "name": item.get("name", "Product") + }) + + logger.info( + f"Formatted items for order {order_id}", + formatted_items_count=len(formatted_items), + sample_items=str(formatted_items[:2]) if formatted_items else "[]" + ) + + # 构建操作按钮 + actions = [ + { + "text": details_text, + "reply": f"{details_reply_prefix}{order_id}" + }, + { + "text": logistics_text, + "reply": f"{logistics_reply_prefix}{order_id}" + } + ] + + # 构建单个订单 + order_data = { + "orderNumber": order_id, + "date": created_at, + "status": status_text, + "items": formatted_items, + "actions": actions + } + + order_list_data.append(order_data) + + # 构建 content_attributes(order_list 格式) + content_attributes = { + "orders": order_list_data + } + + # 记录发送的数据(用于调试) + logger.info( + "Sending order list", + conversation_id=conversation_id, + orders_count=len(orders), + language=language, + payload_preview=str(content_attributes)[:1000] + ) + + # 发送 order_list 类型消息 + client = await self._get_client() + + payload = { + "content_type": "order_list", + "content": "", + "content_attributes": content_attributes + } + + response = await client.post( + f"/conversations/{conversation_id}/messages", + json=payload + ) + response.raise_for_status() + + return response.json() + # ============ Conversations ============ async def get_conversation(self, conversation_id: int) -> dict[str, Any]: @@ -739,9 +1245,16 @@ def create_action_buttons(actions: list[dict[str, Any]]) -> dict[str, Any]: chatwoot_client: Optional[ChatwootClient] = None -def get_chatwoot_client() -> ChatwootClient: - """Get or create global Chatwoot client instance""" +def get_chatwoot_client(account_id: Optional[int] = None) -> ChatwootClient: + """Get or create Chatwoot client instance + + Args: + account_id: Chatwoot account ID. If not provided, uses settings default. + """ global chatwoot_client + if account_id is not None: + # 创建指定 account_id 的客户端实例 + return ChatwootClient(account_id=account_id) if chatwoot_client is None: chatwoot_client = ChatwootClient() return chatwoot_client diff --git a/agent/utils/token_manager.py b/agent/utils/token_manager.py index 1b65db2..12747e8 100644 --- a/agent/utils/token_manager.py +++ b/agent/utils/token_manager.py @@ -12,6 +12,39 @@ logger = get_logger(__name__) class TokenManager: """管理用户 JWT token""" + @staticmethod + def extract_token_from_sender(sender: Optional[dict]) -> Optional[str]: + """从 sender 对象中提取 JWT token + + 支持从以下位置提取 token(按优先级排序): + 1. sender.jwt_token(根级别) + 2. sender.mall_token(根级别) + 3. sender.custom_attributes.jwt_token + 4. sender.custom_attributes.mall_token + 5. sender.custom_attributes.access_token + 6. sender.custom_attributes.auth_token + 7. sender.custom_attributes.token + + Args: + sender: Chatwoot sender 对象(来自 conversation.meta.sender) + + Returns: + JWT token 字符串,如果未找到则返回 None + """ + if not sender: + logger.debug("No sender provided") + return None + + # 1. 优先从根级别获取 token + root_token = sender.get("jwt_token") or sender.get("mall_token") + if root_token: + logger.debug("JWT token found at sender root level") + logger.debug(f"Token prefix: {root_token[:20]}...") + return root_token + + # 2. 从 custom_attributes 中获取 token + return TokenManager.extract_token_from_contact(sender) + @staticmethod def extract_token_from_contact(contact: Optional[dict]) -> Optional[str]: """从 Chatwoot contact 中提取 JWT token diff --git a/agent/webhooks/chatwoot_webhook.py b/agent/webhooks/chatwoot_webhook.py index 4aa7ce6..ed23d90 100644 --- a/agent/webhooks/chatwoot_webhook.py +++ b/agent/webhooks/chatwoot_webhook.py @@ -28,6 +28,10 @@ class WebhookSender(BaseModel): name: Optional[str] = None email: Optional[str] = None type: Optional[str] = None # "contact" or "user" + identifier: Optional[str] = None + jwt_token: Optional[str] = None # JWT token at sender root level + mall_token: Optional[str] = None # Mall token at sender root level + custom_attributes: Optional[dict] = None # May also contain tokens class WebhookMessage(BaseModel): @@ -156,13 +160,22 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: sender_keys=list(payload_dict.get('sender', {}).keys()) if payload_dict.get('sender') else [] ) + # 打印完整的 payload 内容用于调试 + import json + logger.info( + "Full webhook payload JSON", + payload_json=json.dumps(payload_dict, indent=2, ensure_ascii=False, default=str) + ) + # Get account_id from payload (top-level account object) # Chatwoot webhook includes account info at the top level account_obj = payload.account + # 从 webhook 中动态获取 account_id account_id = str(account_obj.get("id")) if account_obj else "1" # 优先使用 Cookie 中的 token user_token = cookie_token + mall_token = None # 如果 Cookie 中没有,尝试从多个来源提取 token if not user_token: @@ -173,14 +186,14 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: contact_id=contact.id if contact else None, contact_type=type(contact).__name__ ) - + # 只有 WebhookContact 才有 custom_attributes if hasattr(contact, 'custom_attributes'): logger.info( "Checking contact custom_attributes", has_custom_attributes=bool(contact.custom_attributes) ) - + custom_attrs = contact.custom_attributes or {} logger.info( "Contact custom_attributes", @@ -188,15 +201,25 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: has_jwt_token='jwt_token' in custom_attrs if custom_attrs else False, has_mall_token='mall_token' in custom_attrs if custom_attrs else False ) - - contact_dict = {"custom_attributes": custom_attrs} - user_token = TokenManager.extract_token_from_contact(contact_dict) - logger.debug("Extracted token from contact", has_token=bool(user_token)) + + # 同时提取 jwt_token 和 mall_token + if custom_attrs.get('jwt_token'): + user_token = custom_attrs.get('jwt_token') + logger.info("JWT token found in contact.custom_attributes", token_prefix=user_token[:20] if user_token else None) + if custom_attrs.get('mall_token'): + mall_token = custom_attrs.get('mall_token') + logger.info("Mall token found in contact.custom_attributes", token_prefix=mall_token[:20] if mall_token else None) + + # 如果没有找到 token,尝试使用通用字段 + if not user_token and not mall_token: + contact_dict = {"custom_attributes": custom_attrs} + user_token = TokenManager.extract_token_from_contact(contact_dict) + logger.debug("Extracted token from contact (generic)", has_token=bool(user_token)) else: logger.debug("Contact type is WebhookSender, no custom_attributes available") - # 2. 尝试从 conversation.meta.sender.custom_attributes 获取(Chatwoot SDK setUser 设置的位置) - if not user_token and conversation: + # 2. 尝试从 conversation.meta.sender 获取(Chatwoot SDK setUser 设置的位置) + if (not user_token or not mall_token) and conversation: logger.debug("Conversation object type", type=str(type(conversation))) if hasattr(conversation, 'model_dump'): conv_dict = conversation.model_dump() @@ -210,11 +233,30 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: sender_keys=list(meta_sender.keys()) if meta_sender else [], has_custom_attributes=bool(meta_sender.get('custom_attributes')) if meta_sender else False ) - - if meta_sender.get('custom_attributes'): + + # 2.1. 优先从 meta.sender 根级别获取 token + if not user_token and meta_sender.get('jwt_token'): + user_token = meta_sender.get('jwt_token') + logger.info("JWT token found in conversation.meta.sender (root level)", token_prefix=user_token[:20] if user_token else None) + if not mall_token and meta_sender.get('mall_token'): + mall_token = meta_sender.get('mall_token') + logger.info("Mall token found in conversation.meta.sender (root level)", token_prefix=mall_token[:20] if mall_token else None) + + # 2.2. 其次从 meta.sender.custom_attributes 获取 + if (not user_token or not mall_token) and meta_sender.get('custom_attributes'): logger.info("Found custom_attributes in meta.sender", keys=list(meta_sender['custom_attributes'].keys())) - user_token = TokenManager.extract_token_from_contact({'custom_attributes': meta_sender['custom_attributes']}) - logger.info("Token found in conversation.meta.sender.custom_attributes", token_prefix=user_token[:20] if user_token else None) + custom_attrs = meta_sender['custom_attributes'] + if not user_token and custom_attrs.get('jwt_token'): + user_token = custom_attrs.get('jwt_token') + logger.info("JWT token found in conversation.meta.sender.custom_attributes", token_prefix=user_token[:20] if user_token else None) + if not mall_token and custom_attrs.get('mall_token'): + mall_token = custom_attrs.get('mall_token') + logger.info("Mall token found in conversation.meta.sender.custom_attributes", token_prefix=mall_token[:20] if mall_token else None) + + # 如果只有 jwt_token,将它也用作 mall_token + if user_token and not mall_token: + mall_token = user_token + logger.debug("Using jwt_token as mall_token", token_prefix=mall_token[:20] if mall_token else None) if user_token: logger.info( @@ -236,9 +278,22 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: conversation_id=conversation_id, user_id=user_id, has_token=bool(user_token), - message_length=len(content) + message_length=len(content), + channel=conversation.channel if conversation else None ) + # 识别消息渠道(邮件、网站等) + message_channel = conversation.channel if conversation else "Channel" + is_email = message_channel == "Email" + + # 邮件渠道特殊处理 + if is_email: + logger.info( + "Email channel detected", + conversation_id=conversation_id, + sender_email=contact.email if contact else None + ) + # Load conversation context from cache cache = get_cache_manager() await cache.connect() @@ -249,6 +304,12 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: # Add token to context if available if user_token: context["user_token"] = user_token + if mall_token: + context["mall_token"] = mall_token + + # 添加渠道信息到 context(让 Agent 知道是邮件还是网站) + context["channel"] = message_channel + context["is_email"] = is_email try: # Process message through agent workflow @@ -259,24 +320,26 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: message=content, history=history, context=context, - user_token=user_token + user_token=user_token, + mall_token=mall_token ) # Get response response = final_state.get("response") - if not response: + if response is None: response = "抱歉,我暂时无法处理您的请求。请稍后重试或联系人工客服。" - - # Send response to Chatwoot - # Create client with correct account_id from webhook + + # Create Chatwoot client from integrations.chatwoot import ChatwootClient chatwoot = ChatwootClient(account_id=int(account_id)) - await chatwoot.send_message( - conversation_id=conversation.id, - content=response - ) - await chatwoot.close() - + + # Send response to Chatwoot (skip if empty - agent may have already sent rich content) + if response: + await chatwoot.send_message( + conversation_id=conversation.id, + content=response + ) + # Handle human handoff if final_state.get("requires_human"): await chatwoot.update_conversation_status( @@ -288,6 +351,8 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: conversation_id=conversation.id, labels=["needs_human", final_state.get("intent", "unknown")] ) + + await chatwoot.close() # Update cache await cache.add_message(conversation_id, "user", content) @@ -313,12 +378,12 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: ) # Send error response - chatwoot = get_chatwoot_client() + chatwoot = get_chatwoot_client(account_id=int(account_id)) await chatwoot.send_message( conversation_id=conversation.id, content="抱歉,处理您的消息时遇到了问题。我们的客服团队将尽快为您服务。" ) - + # Transfer to human await chatwoot.update_conversation_status( conversation_id=conversation.id, @@ -328,30 +393,36 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: async def handle_conversation_created(payload: ChatwootWebhookPayload) -> None: """Handle new conversation created - + Args: payload: Webhook payload """ conversation = payload.conversation if not conversation: return - + conversation_id = str(conversation.id) - + + # Get account_id from payload + account_obj = payload.account + # 从 webhook 中动态获取 account_id + account_id = str(account_obj.get("id")) if account_obj else "1" + logger.info( "New conversation created", - conversation_id=conversation_id + conversation_id=conversation_id, + account_id=account_id ) - + # Initialize conversation context cache = get_cache_manager() await cache.connect() - + context = { "created": True, "inbox_id": conversation.inbox_id } - + # Add contact info to context contact = payload.contact if contact: @@ -359,15 +430,24 @@ async def handle_conversation_created(payload: ChatwootWebhookPayload) -> None: context["contact_email"] = contact.email if contact.custom_attributes: context.update(contact.custom_attributes) - + await cache.set_context(conversation_id, context) - - # Send welcome message - chatwoot = get_chatwoot_client() - await chatwoot.send_message( - conversation_id=conversation.id, - content="您好!我是 AI 智能助手,很高兴为您服务。请问有什么可以帮您的?\n\n您可以询问我关于订单、商品、售后等问题,我会尽力为您解答。" - ) + + # 检查是否是邮件渠道 + is_email = conversation.channel == "Email" if conversation else False + + # 只对非邮件渠道发送欢迎消息 + if not is_email: + chatwoot = get_chatwoot_client(account_id=int(account_id)) + await chatwoot.send_message( + conversation_id=conversation.id, + content="您好!我是 AI 智能助手,很高兴为您服务。请问有什么可以帮您的?\n\n您可以询问我关于订单、商品、售后等问题,我会尽力为您解答。" + ) + else: + logger.info( + "Email channel detected, skipping welcome message", + conversation_id=conversation_id + ) async def handle_conversation_status_changed(payload: ChatwootWebhookPayload) -> None: diff --git a/docker-compose.yml b/docker-compose.yml index 09ab3cd..e933e0a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,29 +2,8 @@ version: '3.8' services: # ============ Infrastructure ============ - - # PostgreSQL (Chatwoot Database) - postgres: - build: - context: . - dockerfile: postgres-with-pgvector.Dockerfile - container_name: ai_postgres - environment: - POSTGRES_DB: ${POSTGRES_DB:-chatwoot} - POSTGRES_USER: ${POSTGRES_USER:-chatwoot} - POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} - volumes: - - postgres_data:/var/lib/postgresql/data - networks: - - ai_network - healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-chatwoot}"] - interval: 10s - timeout: 5s - retries: 5 - restart: unless-stopped - # Redis (Cache & Queue) + # Redis (Cache & Queue) - Agent 和 MCP 服务都需要 redis: image: redis:7-alpine container_name: ai_redis @@ -40,84 +19,8 @@ services: retries: 5 restart: unless-stopped - # Nginx (Static File Server) - nginx: - image: nginx:alpine - container_name: ai_nginx - ports: - - "8080:80" - volumes: - - ./docs:/usr/share/nginx/html/docs:ro - - ./nginx.conf:/etc/nginx/nginx.conf:ro - networks: - - ai_network - restart: unless-stopped - healthcheck: - test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost/test-chat.html"] - interval: 30s - timeout: 10s - retries: 3 - - # ============ Messaging Platform ============ - - # Chatwoot - chatwoot: - image: chatwoot/chatwoot:latest - container_name: ai_chatwoot - # 启动前清理 PID 文件,避免重启循环 - command: sh -c "rm -f /app/tmp/pids/server.pid && bundle exec rails s -p 3000 -b 0.0.0.0" - environment: - RAILS_ENV: production - SECRET_KEY_BASE: ${CHATWOOT_SECRET_KEY_BASE} - FRONTEND_URL: ${CHATWOOT_FRONTEND_URL:-http://localhost:3000} - # 允许 Widget 从多个域名访问(逗号分隔) - ALLOWED_DOMAINS_FOR_WIDGET: ${CHATWOOT_ALLOWED_DOMAINS:-http://localhost:3000,http://localhost:8080,http://127.0.0.1:3000,http://127.0.0.1:8080} - POSTGRES_HOST: postgres - POSTGRES_DATABASE: ${POSTGRES_DB:-chatwoot} - POSTGRES_USERNAME: ${POSTGRES_USER:-chatwoot} - POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} - REDIS_URL: redis://redis:6379 - INSTALLATION_NAME: B2B AI Assistant - ports: - - "3000:3000" - volumes: - - chatwoot_data:/app/storage - depends_on: - postgres: - condition: service_healthy - redis: - condition: service_healthy - networks: - - ai_network - restart: unless-stopped - - # Chatwoot Sidekiq Worker - chatwoot_worker: - image: chatwoot/chatwoot:latest - container_name: ai_chatwoot_worker - command: bundle exec sidekiq -C config/sidekiq.yml - environment: - RAILS_ENV: production - SECRET_KEY_BASE: ${CHATWOOT_SECRET_KEY_BASE} - POSTGRES_HOST: postgres - POSTGRES_DATABASE: ${POSTGRES_DB:-chatwoot} - POSTGRES_USERNAME: ${POSTGRES_USER:-chatwoot} - POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} - REDIS_URL: redis://redis:6379 - INSTALLATION_NAME: B2B AI Assistant - volumes: - - chatwoot_data:/app/storage - depends_on: - postgres: - condition: service_healthy - redis: - condition: service_healthy - networks: - - ai_network - restart: unless-stopped - # ============ AI Agent Layer ============ - + # LangGraph Agent Main Service agent: build: @@ -133,8 +36,8 @@ services: REDIS_PORT: 6379 REDIS_PASSWORD: ${REDIS_PASSWORD} REDIS_DB: 0 - # Chatwoot - CHATWOOT_API_URL: http://chatwoot:3000 + # Chatwoot (远程服务器) + CHATWOOT_API_URL: ${CHATWOOT_API_URL} CHATWOOT_API_TOKEN: ${CHATWOOT_API_TOKEN} CHATWOOT_WEBHOOK_SECRET: ${CHATWOOT_WEBHOOK_SECRET} # External APIs @@ -142,6 +45,14 @@ services: STRAPI_API_TOKEN: ${STRAPI_API_TOKEN} HYPERF_API_URL: ${HYPERF_API_URL} HYPERF_API_TOKEN: ${HYPERF_API_TOKEN} + # Mall API + MALL_API_URL: ${MALL_API_URL} + MALL_TENANT_ID: ${MALL_TENANT_ID:-2} + MALL_CURRENCY_CODE: ${MALL_CURRENCY_CODE:-EUR} + MALL_LANGUAGE_ID: ${MALL_LANGUAGE_ID:-1} + MALL_SOURCE: ${MALL_SOURCE:-us.qa1.gaia888.com} + # Frontend URLs + FRONTEND_URL: ${FRONTEND_URL:-https://www.qa1.gaia888.com} # MCP Servers STRAPI_MCP_URL: http://strapi_mcp:8001 ORDER_MCP_URL: http://order_mcp:8002 @@ -167,7 +78,7 @@ services: restart: unless-stopped # ============ MCP Servers ============ - + # Strapi MCP (FAQ/Knowledge Base) strapi_mcp: build: @@ -256,7 +167,5 @@ networks: driver: bridge volumes: - postgres_data: redis_data: - chatwoot_data: agent_logs: diff --git a/mcp_servers/order_mcp/server.py b/mcp_servers/order_mcp/server.py index 26caec3..b32fa43 100644 --- a/mcp_servers/order_mcp/server.py +++ b/mcp_servers/order_mcp/server.py @@ -420,7 +420,7 @@ async def get_logistics( ) print(f"[get_logistics] SUCCESS: result_keys={list(result.keys()) if isinstance(result, dict) else type(result).__name__}") - print(f"[get_logistics] Sample data: {str(result)[:500]}") + print(f"[get_logistics] Sample data: {str(result)[:1000]}") # Mall API 返回结构:{ "total": 1, "data": [{ "trackingCode": "...", "carrier": "...", ... }] } logistics_list = result.get("data", []) @@ -430,7 +430,21 @@ async def get_logistics( tracking_number = first_logistics.get("trackingCode", "") carrier = first_logistics.get("carrier", "未知") - print(f"[get_logistics] Extracted: tracking_number={tracking_number}, carrier={carrier}") + # 提取 tracks 数组(物流轨迹) + tracks = first_logistics.get("tracks", []) + timeline = [] + + if tracks and isinstance(tracks, list): + for track in tracks: + if isinstance(track, dict): + timeline.append({ + "id": track.get("id", ""), + "remark": track.get("remark", ""), + "time": track.get("time", track.get("date", "")), + "location": track.get("location", "") + }) + + print(f"[get_logistics] Extracted: tracking_number={tracking_number}, carrier={carrier}, tracks_count={len(timeline)}") return { "success": True, @@ -439,7 +453,7 @@ async def get_logistics( "courier": carrier, "tracking_url": first_logistics.get("trackingUrl", ""), "status": first_logistics.get("status", ""), - "timeline": [] # 如果 API 返回轨迹信息,可以在这里添加 + "timeline": timeline } else: print(f"[get_logistics] WARNING: No logistics data found in response") @@ -466,6 +480,159 @@ async def get_logistics( await client.close() +@register_tool("get_mall_order_list") +@mcp.tool() +async def get_mall_order_list( + user_token: str = None, + user_id: str = None, + account_id: str = None, + page: int = 1, + limit: int = 5, + customer_id: int = 0, + order_types: Optional[List[int]] = None, + shipping_status: int = 10000, + date_added: Optional[str] = None, + date_end: Optional[str] = None, + no: Optional[str] = None, + status: Optional[int] = None, + is_drop_shopping: int = 0 +) -> dict: + """Query order list from Mall API with filters + + 从 Mall API 查询订单列表,支持多种筛选条件 + + 规则: + - 默认返回最近 5 个订单 + - 包含全部状态的订单(不限制状态) + + Args: + user_token: 用户 JWT token(必需,用于身份验证) + user_id: 用户 ID(自动注入,此工具不使用) + account_id: 账户 ID(自动注入,此工具不使用) + page: 页码 (default: 1) + limit: 每页数量 (default: 5, max 50) + customer_id: 客户ID (default: 0, 0表示所有客户) + order_types: 订单类型数组,如 [1, 2] (default: None) + shipping_status: 物流状态 (default: 10000, 10000表示全部状态) + date_added: 开始日期,格式 YYYY-MM-DD (default: None) + date_end: 结束日期,格式 YYYY-MM-DD (default: None) + no: 订单号筛选 (default: None) + status: 订单状态筛选 (default: None, None表示全部状态) + is_drop_shopping: 是否代发货 (default: 0) + + Returns: + 订单列表和分页信息 + { + "success": true, + "orders": [...], # 订单列表 + "total": 100, # 总订单数 + "page": 1, # 当前页 + "limit": 5, # 每页数量 + "total_pages": 20 # 总页数 + } + + Example: + 用户问: "我的订单有哪些?" + Agent 调用: get_mall_order_list(page=1, limit=5) + """ + import logging + import json + import base64 + logger = logging.getLogger(__name__) + + logger.info( + f"get_mall_order_list called: page={page}, limit={limit}, " + f"has_user_token={bool(user_token)}, customer_id={customer_id}" + ) + + # 必须提供 user_token + if not user_token: + logger.error("No user_token provided, user must be logged in") + return { + "success": False, + "error": "用户未登录,请先登录账户以查询订单列表", + "require_login": True, + "orders": [] + } + + # 从 JWT token 中提取 userId 作为 customer_id + if customer_id == 0: + try: + # JWT token 格式: header.payload.signature + parts = user_token.split('.') + if len(parts) >= 2: + # 解码 payload + payload = parts[1] + # 添加 padding 如果需要 + payload += '=' * (4 - len(payload) % 4) + decoded = base64.b64decode(payload) + payload_data = json.loads(decoded) + + # 尝试从不同字段获取 userId + customer_id = payload_data.get('userId') or payload_data.get('uid') or payload_data.get('sub') or 0 + + logger.info( + f"Extracted customer_id from token: customer_id={customer_id}, " + f"token_payload_keys={list(payload_data.keys())}" + ) + except Exception as e: + logger.warning(f"Failed to extract customer_id from token: {e}") + customer_id = 0 + + try: + client = MallClient( + api_url=settings.mall_api_url, + api_token=user_token, + tenant_id=settings.mall_tenant_id, + currency_code=settings.mall_currency_code, + language_id=settings.mall_language_id, + source=settings.mall_source + ) + + result = await client.get_order_list( + page=page, + limit=limit, + customer_id=customer_id, + order_types=order_types, + shipping_status=shipping_status, + date_added=date_added, + date_end=date_end, + no=no, + status=status, + is_drop_shopping=is_drop_shopping + ) + + logger.info( + f"Mall API request successful: page={page}, " + f"result_keys={list(result.keys()) if isinstance(result, dict) else None}" + ) + + # Mall API 返回结构: {"data": [...], "total": 100} + orders = result.get("data", []) + total = result.get("total", 0) + + return { + "success": True, + "orders": orders, + "total": total, + "page": page, + "limit": limit, + "total_pages": (total + limit - 1) // limit if limit > 0 else 0 + } + except Exception as e: + logger.error( + f"Mall API request failed: page={page}, error={str(e)}" + ) + return { + "success": False, + "error": str(e), + "orders": [] + } + finally: + if 'client' in dir() and client: + await client.close() + + # Health check endpoint @register_tool("health_check") @mcp.tool() diff --git a/mcp_servers/shared/mall_client.py b/mcp_servers/shared/mall_client.py index c116ddd..14c79b6 100644 --- a/mcp_servers/shared/mall_client.py +++ b/mcp_servers/shared/mall_client.py @@ -47,25 +47,36 @@ class MallClient: self.source = source or settings.mall_source self._client: Optional[httpx.AsyncClient] = None - async def _get_client(self) -> httpx.AsyncClient: - """Get or create HTTP client with default headers""" + async def _get_client(self, extra_headers: Optional[dict[str, str]] = None) -> httpx.AsyncClient: + """Get or create HTTP client with default headers + + Args: + extra_headers: 额外的请求头,某些接口需要特殊的 header(如 Authorization2) + """ if self._client is None: + default_headers = { + "Authorization": f"Bearer {self.api_token}", + "Content-Type": "application/json", + "Accept": "application/json, text/plain, */*", + "Device-Type": "pc", + "tenant-Id": self.tenant_id, + "currency-code": self.currency_code, + "language-id": self.language_id, + "language_id": self.language_id, # 某些接口使用下划线 + "source": self.source, + "Origin": "https://www.qa1.gaia888.com", + "Referer": "https://www.qa1.gaia888.com/", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "DNT": "1", + } + + # 合并额外的 headers(用于 Authorization2 等) + if extra_headers: + default_headers.update(extra_headers) + self._client = httpx.AsyncClient( base_url=self.api_url, - headers={ - "Authorization": f"Bearer {self.api_token}", - "Content-Type": "application/json", - "Accept": "application/json, text/plain, */*", - "Device-Type": "pc", - "tenant-Id": self.tenant_id, - "currency-code": self.currency_code, - "language-id": self.language_id, - "source": self.source, - "Origin": "https://www.qa1.gaia888.com", - "Referer": "https://www.qa1.gaia888.com/", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", - "DNT": "1", - }, + headers=default_headers, timeout=30.0 ) return self._client @@ -82,7 +93,8 @@ class MallClient: endpoint: str, params: Optional[dict[str, Any]] = None, json: Optional[dict[str, Any]] = None, - headers: Optional[dict[str, str]] = None + headers: Optional[dict[str, str]] = None, + use_authorization2: bool = False ) -> dict[str, Any]: """Make API request and handle response @@ -92,11 +104,20 @@ class MallClient: params: Query parameters json: JSON body headers: Additional headers + use_authorization2: 是否使用 Authorization2 header 而不是 Authorization Returns: Response data """ - client = await self._get_client() + # 如果需要 Authorization2,关闭现有 client 并用新的 headers 创建 + if use_authorization2: + if self._client: + await self._client.aclose() + self._client = None + extra_headers = {"Authorization2": f"Bearer {self.api_token}"} + client = await self._get_client(extra_headers) + else: + client = await self._get_client() # Merge additional headers request_headers = {} @@ -126,19 +147,21 @@ class MallClient: self, endpoint: str, params: Optional[dict[str, Any]] = None, + use_authorization2: bool = False, **kwargs: Any ) -> dict[str, Any]: """GET request""" - return await self.request("GET", endpoint, params=params, **kwargs) + return await self.request("GET", endpoint, params=params, use_authorization2=use_authorization2, **kwargs) async def post( self, endpoint: str, json: Optional[dict[str, Any]] = None, + use_authorization2: bool = False, **kwargs: Any ) -> dict[str, Any]: """POST request""" - return await self.request("POST", endpoint, json=json, **kwargs) + return await self.request("POST", endpoint, json=json, use_authorization2=use_authorization2, **kwargs) # ============ Order APIs ============ @@ -171,6 +194,77 @@ class MallClient: except Exception as e: raise Exception(f"查询订单失败 (Query order failed): {str(e)}") + async def get_order_list( + self, + page: int = 1, + limit: int = 10, + customer_id: int = 0, + order_types: Optional[list[int]] = None, + shipping_status: int = 10000, + date_added: Optional[str] = None, + date_end: Optional[str] = None, + no: Optional[str] = None, + status: Optional[int] = None, + is_drop_shopping: int = 0 + ) -> dict[str, Any]: + """Query order list with filters + + 查询订单列表,支持多种筛选条件 + + Args: + page: 页码 (default: 1) + limit: 每页数量 (default: 10) + customer_id: 客户ID (default: 0) + order_types: 订单类型数组,如 [1, 2] (default: None) + shipping_status: 物流状态 (default: 10000) + date_added: 开始日期,格式 YYYY-MM-DD (default: None) + date_end: 结束日期,格式 YYYY-MM-DD (default: None) + no: 订单号 (default: None) + status: 订单状态 (default: None) + is_drop_shopping: 是否代发货 (default: 0) + + Returns: + 订单列表和分页信息 + Order list and pagination info + + Example: + >>> client = MallClient() + >>> result = await client.get_order_list(page=1, limit=10) + >>> print(result["data"]) # 订单列表 + >>> print(result["total"]) # 总数 + """ + try: + params = { + "page": page, + "limit": limit, + "customerId": customer_id, + "shippingStatus": shipping_status, + "isDropShopping": is_drop_shopping + } + + # 可选参数 + if order_types: + # orderTypes 是数组,需要特殊处理 + # API 格式: orderTypes[]=1&orderTypes[]=2 + params["orderTypes"] = order_types + if date_added: + params["dateAdded"] = date_added + if date_end: + params["dateEnd"] = date_end + if no: + params["no"] = no + if status is not None: + params["status"] = status + + result = await self.get( + "/mall/api/order/list", + params=params, + use_authorization2=True # 订单列表接口需要 Authorization2 + ) + return result + except Exception as e: + raise Exception(f"查询订单列表失败 (Query order list failed): {str(e)}") + # Global Mall client instance mall_client: Optional[MallClient] = None