diff --git a/.env.example b/.env.example index 013535b..d404ba9 100644 --- a/.env.example +++ b/.env.example @@ -1,8 +1,22 @@ # B2B Shopping AI Assistant Platform - Environment Variables # ============ AI Model ============ +# LLM Provider: 'zhipu' (default) or 'qwen' +LLM_PROVIDER=zhipu + +# ZhipuAI (智谱 AI) ZHIPU_API_KEY=your_zhipu_api_key -ZHIPU_MODEL=glm-4 +ZHIPU_MODEL=glm-4-flash + +# Qwen (通义千问) - Alternative provider +QWEN_API_KEY=your_qwen_dashscope_api_key +QWEN_MODEL=qwen-omni-turbo +# Available Qwen models: +# - qwen-omni-turbo (多模态,支持图片/语音) +# - qwen-plus (通用模型) +# - qwen-turbo (高速模型) +# - qwen-long (长文本) +# - qwen-max (最强模型) # ============ Redis ============ REDIS_HOST=redis diff --git a/agent/agents/customer_service.py b/agent/agents/customer_service.py index ce3c6e3..7769575 100644 --- a/agent/agents/customer_service.py +++ b/agent/agents/customer_service.py @@ -322,16 +322,51 @@ async def customer_service_agent(state: AgentState) -> AgentState: return state except json.JSONDecodeError as e: - # JSON parsing failed + # JSON parsing failed - try alternative format: "tool_name\n{args}" logger.error( "Failed to parse LLM response as JSON", error=str(e), raw_content=content[:500], conversation_id=state["conversation_id"] ) - # Don't use raw content as response - use fallback instead - state = set_response(state, "抱歉,我无法理解您的请求。请尝试重新表述或联系人工客服。") - return state + + # Handle non-JSON format: "tool_name\n{args}" + if '\n' in content and not content.startswith('{'): + lines = content.split('\n', 1) + tool_name = lines[0].strip() + args_json = lines[1].strip() if len(lines) > 1 else '{}' + + try: + arguments = json.loads(args_json) if args_json else {} + logger.info( + "Customer service agent calling tool (alternative format)", + tool_name=tool_name, + arguments=arguments, + conversation_id=state["conversation_id"] + ) + + state = add_tool_call( + state, + tool_name=tool_name, + arguments=arguments, + server="strapi" + ) + state["state"] = ConversationState.TOOL_CALLING.value + return state + except json.JSONDecodeError: + # Args parsing also failed + logger.warning( + "Failed to parse tool arguments", + tool_name=tool_name, + args_json=args_json[:200], + conversation_id=state["conversation_id"] + ) + state = set_response(state, "抱歉,我无法理解您的请求。请尝试重新表述或联系人工客服。") + return state + else: + # Not a recognized format + state = set_response(state, "抱歉,我无法理解您的请求。请尝试重新表述或联系人工客服。") + return state except Exception as e: logger.error("Customer service agent failed", error=str(e), exc_info=True) @@ -342,11 +377,59 @@ async def customer_service_agent(state: AgentState) -> AgentState: async def _generate_response_from_results(state: AgentState) -> AgentState: """Generate response based on tool results""" - # Build context from tool results + # Build context from tool results - extract only essential info to reduce prompt size tool_context = [] for result in state["tool_results"]: if result["success"]: - tool_context.append(f"Tool {result['tool_name']} returned:\n{json.dumps(result['data'], ensure_ascii=False, indent=2)}") + tool_name = result['tool_name'] + data = result['data'] + + # Extract only essential information based on tool type + if tool_name == "get_company_info": + # Extract key contact info only + contact = data.get('contact', {}) + emails = contact.get('email', []) + if isinstance(emails, list) and emails: + email_str = ", ".join(emails[:3]) # Max 3 emails + else: + email_str = str(emails) if emails else "N/A" + + phones = contact.get('phone', []) + if isinstance(phones, list) and phones: + phone_str = ", ".join(phones[:2]) # Max 2 phones + else: + phone_str = str(phones) if phones else "N/A" + + address = contact.get('address', {}) + address_str = f"{address.get('city', '')}, {address.get('country', '')}".strip(', ') + + summary = f"Contact Information: Emails: {email_str} | Phones: {phone_str} | Address: {address_str} | Working hours: {contact.get('working_hours', 'N/A')}" + tool_context.append(summary) + + elif tool_name == "query_faq" or tool_name == "search_knowledge_base": + # Extract FAQ items summary + faqs = data.get('faqs', []) if isinstance(data, dict) else [] + if faqs: + faq_summaries = [f"- Q: {faq.get('question', '')[:50]}... A: {faq.get('answer', '')[:50]}..." for faq in faqs[:3]] + summary = f"Found {len(faqs)} FAQ items:\n" + "\n".join(faq_summaries) + tool_context.append(summary) + else: + tool_context.append("No FAQ items found") + + elif tool_name == "get_categories": + # Extract category names only + categories = data.get('categories', []) if isinstance(data, dict) else [] + category_names = [cat.get('name', '') for cat in categories[:5] if cat.get('name')] + summary = f"Available categories: {', '.join(category_names)}" + if len(categories) > 5: + summary += f" (and {len(categories) - 5} more)" + tool_context.append(summary) + + else: + # For other tools, include concise summary (limit to 200 chars) + data_str = json.dumps(data, ensure_ascii=False)[:200] + tool_context.append(f"Tool {tool_name} returned: {data_str}...") + else: tool_context.append(f"Tool {result['tool_name']} failed: {result['error']}") @@ -357,7 +440,8 @@ User question: {state["current_message"]} Tool returned information: {chr(10).join(tool_context)} -Please generate a friendly and professional response. If the tool did not return useful information, honestly inform the user and suggest other ways to get help. +Please generate a friendly and professional response in Chinese. Keep it concise but informative. +If the tool did not return useful information, honestly inform the user and suggest other ways to get help. Return only the response content, do not return JSON.""" messages = [ @@ -367,11 +451,12 @@ Return only the response content, do not return JSON.""" try: llm = get_llm_client() - response = await llm.chat(messages, temperature=0.7) + # Lower temperature for faster response + response = await llm.chat(messages, temperature=0.3) state = set_response(state, response.content) return state except Exception as e: logger.error("Response generation failed", error=str(e)) - state = set_response(state, "Sorry, there was a problem processing your request. Please try again later or contact customer support.") + state = set_response(state, "抱歉,处理您的请求时出现问题。请稍后重试或联系人工客服。") return state diff --git a/agent/agents/product.py b/agent/agents/product.py index d43f7ad..113ede7 100644 --- a/agent/agents/product.py +++ b/agent/agents/product.py @@ -30,9 +30,11 @@ PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。 2. **get_product_detail** - 获取商品详情 - product_id: 商品ID -3. **recommend_products** - 智能推荐 - - context: 推荐上下文(可包含当前查询、浏览历史等) - - limit: 推荐数量 +3. **recommend_products** - 智能推荐(心动清单/猜你喜欢) + - page_size: 推荐数量(默认 6,最大 100) + - page: 页码(默认 1) + - warehouse_id: 仓库ID(默认 2) + - 说明:此工具使用 Mall API /mall/api/loveList 接口,需要用户 token 认证,系统会自动注入用户 token 4. **get_quote** - B2B 询价 - product_id: 商品ID @@ -81,6 +83,18 @@ PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。 } ``` +用户说:"推荐一些商品" +返回: +```json +{ + "action": "call_tool", + "tool_name": "recommend_products", + "arguments": { + "page_size": 6 + } +} +``` + 当需要向用户询问更多信息时: ```json { @@ -104,6 +118,17 @@ PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。 - 报价通常有有效期 ## 商品推荐策略 + +**重要规则:推荐 vs 搜索** +- **泛泛推荐**("推荐一些商品"、"推荐一下"、"有什么好推荐的") → 使用 recommend_products +- **具体商品推荐**("推荐ring相关的商品"、"推荐手机"、"推荐一些珠宝") → 使用 search_products (提取关键词:ring、手机、珠宝) +- **商品搜索**("搜索ring"、"找ring商品") → 使用 search_products + +**说明**: +- 如果用户推荐请求中包含具体的商品关键词(如 ring、手机、珠宝等),使用 search_products 进行精准搜索 +- 只有在泛泛请求推荐时才使用 recommend_products(基于用户行为的个性化推荐) + +**其他推荐依据**: - 根据用户采购历史推荐 - 根据当前查询语义推荐 - 根据企业行业特点推荐 @@ -119,12 +144,12 @@ PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。 async def product_agent(state: AgentState) -> AgentState: """Product agent node - + Handles product search, recommendations, quotes and inventory queries. - + Args: state: Current agent state - + Returns: Updated state with tool calls or response """ @@ -133,11 +158,40 @@ async def product_agent(state: AgentState) -> AgentState: conversation_id=state["conversation_id"], sub_intent=state.get("sub_intent") ) - + state["current_agent"] = "product" state["agent_history"].append("product") state["state"] = ConversationState.PROCESSING.value - + + # ========== FAST PATH: Image Search ========== + # Check if this is an image search request + image_search_url = state.get("context", {}).get("image_search_url") + if image_search_url: + logger.info( + "Image search detected, calling search_products_by_image", + conversation_id=state["conversation_id"], + image_url=image_search_url[:100] + "..." if len(image_search_url) > 100 else image_search_url + ) + + # 直接调用图片搜索工具 + state = add_tool_call( + state, + tool_name="search_products_by_image", + arguments={ + "image_url": image_search_url, + "page_size": 6, + "page": 1 + }, + server="product" + ) + + # 清除 image_search_url 防止无限循环 + state["context"]["image_search_url"] = None + + state["state"] = ConversationState.TOOL_CALLING.value + return state + # ============================================== + # Check if we have tool results to process if state["tool_results"]: return await _generate_product_response(state) @@ -148,7 +202,8 @@ async def product_agent(state: AgentState) -> AgentState: ] # Add conversation history - for msg in state["messages"][-6:]: + # 只保留最近 2 条历史消息以减少 token 数量和响应时间 + for msg in state["messages"][-2:]: messages.append(Message(role=msg["role"], content=msg["content"])) # Build context info @@ -233,7 +288,7 @@ async def product_agent(state: AgentState) -> AgentState: # Set default page_size if not provided if "page_size" not in arguments: - arguments["page_size"] = 5 + arguments["page_size"] = 6 # Set default page if not provided if "page" not in arguments: @@ -249,8 +304,21 @@ async def product_agent(state: AgentState) -> AgentState: # Inject context for recommendation if tool_name == "recommend_products": - arguments["user_id"] = state["user_id"] - arguments["account_id"] = state["account_id"] + arguments["user_token"] = state.get("user_token") + # 如果没有提供 page_size,使用默认值 6 + if "page_size" not in arguments: + arguments["page_size"] = 6 + # 如果没有提供 warehouse_id,使用默认值 2 + if "warehouse_id" not in arguments: + arguments["warehouse_id"] = 2 + + logger.info( + "Product agent recommend_products after injection", + user_token_present="user_token" in arguments, + user_token_preview=arguments.get("user_token", "")[:20] + "..." if arguments.get("user_token") else None, + arguments=arguments, + conversation_id=state["conversation_id"] + ) # Inject context for quote if tool_name == "get_quote": @@ -301,25 +369,55 @@ async def product_agent(state: AgentState) -> AgentState: async def _generate_product_response(state: AgentState) -> AgentState: """Generate response based on product tool results""" - # 特殊处理:如果是 search_products 工具返回,直接发送商品卡片 - has_product_search_result = False + # 特殊处理:如果是 search_products、recommend_products 或 search_products_by_image 工具返回,直接发送商品卡片 + has_product_result = False products = [] + result_source = None # "search", "recommend" 或 "image_search" + + # 添加日志:查看所有工具结果 + import json as json_module + logger.info( + "All tool results", + tool_results_count=len(state.get("tool_results", [])), + tool_results=json_module.dumps(state.get("tool_results", []), ensure_ascii=False, indent=2)[:2000] + ) for result in state["tool_results"]: - if result["success"] and result["tool_name"] == "search_products": + logger.info( + "Processing tool result", + tool_name=result["tool_name"], + success=result["success"], + data_keys=list(result.get("data", {}).keys()) if isinstance(result.get("data"), dict) else "not a dict", + data_preview=json_module.dumps(result.get("data"), ensure_ascii=False)[:500] + ) + + if result["success"] and result["tool_name"] in ["search_products", "recommend_products", "search_products_by_image"]: data = result["data"] if isinstance(data, dict) and data.get("success"): - products = data.get("products", []) - has_product_search_result = True + # MCP 返回的数据结构: {"success": true, "result": {"success": true, "products": [...]}} + # 需要从 result.result 中提取实际数据 + inner_data = data.get("result", data) + products = inner_data.get("products", []) + keyword = inner_data.get("keyword", "") + + has_product_result = True + if result["tool_name"] == "recommend_products": + result_source = "recommend" + elif result["tool_name"] == "search_products_by_image": + result_source = "image_search" + else: + result_source = "search" + logger.info( - "Product search results found", + f"Product {result_source} results found", products_count=len(products), - keyword=data.get("keyword", "") + keyword=keyword, + products_preview=json_module.dumps(products[:2], ensure_ascii=False, indent=2) if products else "[]" ) break - # 如果有商品搜索结果,直接发送商品卡片 - if has_product_search_result and products: + # 如果有商品结果,直接发送商品卡片(product_list 格式) + if has_product_result and products: try: from integrations.chatwoot import ChatwootClient from core.language_detector import detect_language @@ -327,7 +425,7 @@ async def _generate_product_response(state: AgentState) -> AgentState: # 检测语言 detected_language = state.get("detected_language", "en") - # 发送商品卡片 + # 发送商品列表 chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1))) conversation_id = state.get("conversation_id") @@ -339,10 +437,11 @@ async def _generate_product_response(state: AgentState) -> AgentState: ) logger.info( - "Product cards sent successfully", + f"Product {result_source} cards sent successfully", conversation_id=conversation_id, products_count=len(products), - language=detected_language + language=detected_language, + result_source=result_source ) # 清空响应,避免重复发送 @@ -352,17 +451,56 @@ async def _generate_product_response(state: AgentState) -> AgentState: except Exception as e: logger.error( - "Failed to send product cards, falling back to text response", + f"Failed to send product {result_source} cards, falling back to text response", error=str(e), - products_count=len(products) + products_count=len(products), + result_source=result_source ) # 常规处理:生成文本响应 tool_context = [] for result in state["tool_results"]: if result["success"]: - data = result["data"] - tool_context.append(f"工具 {result['tool_name']} 返回:\n{json.dumps(data, ensure_ascii=False, indent=2)}") + tool_name = result['tool_name'] + data = result['data'] + + # Extract only essential information based on tool type + if tool_name == "search_products" or tool_name == "recommend_products": + products = data.get("products", []) if isinstance(data, dict) else [] + if products: + product_summaries = [f"- {p.get('product_name', 'N/A')}: {p.get('price', 'N/A')}" for p in products[:3]] + summary = f"Found {len(products)} products:\n" + "\n".join(product_summaries) + if len(products) > 3: + summary += f"\n(and {len(products) - 3} more)" + tool_context.append(summary) + else: + tool_context.append("No products found") + + elif tool_name == "get_product_detail": + product = data.get("product", {}) if isinstance(data, dict) else {} + name = product.get("product_name", product.get("name", "N/A")) + price = product.get("price", "N/A") + stock = product.get("stock", product.get("stock_status", "N/A")) + summary = f"Product: {name} | Price: {price} | Stock: {stock}" + tool_context.append(summary) + + elif tool_name == "check_inventory": + inventory = data.get("inventory", []) if isinstance(data, dict) else [] + inv_summaries = [f"{inv.get('product_id', 'N/A')}: {inv.get('quantity', 'N/A')} available" for inv in inventory[:3]] + summary = "Inventory status:\n" + "\n".join(inv_summaries) + tool_context.append(summary) + + elif tool_name == "get_pricing": + product_id = data.get("product_id", "N/A") + unit_price = data.get("unit_price", "N/A") + total_price = data.get("total_price", "N/A") + summary = f"Quote for {product_id}: Unit: {unit_price} | Total: {total_price}" + tool_context.append(summary) + + else: + # For other tools, include concise summary (limit to 200 chars) + data_str = json.dumps(data, ensure_ascii=False)[:200] + tool_context.append(f"工具 {tool_name} 返回: {data_str}...") # Extract product context if isinstance(data, dict): @@ -398,7 +536,8 @@ async def _generate_product_response(state: AgentState) -> AgentState: try: llm = get_llm_client() - response = await llm.chat(messages, temperature=0.7) + # Lower temperature for faster response + response = await llm.chat(messages, temperature=0.3) state = set_response(state, response.content) return state diff --git a/agent/config.py b/agent/config.py index 87540f4..2bfdeea 100644 --- a/agent/config.py +++ b/agent/config.py @@ -10,8 +10,17 @@ class Settings(BaseSettings): """Application settings loaded from environment variables""" # ============ AI Model ============ - zhipu_api_key: str = Field(..., description="ZhipuAI API Key") - zhipu_model: str = Field(default="glm-4", description="ZhipuAI Model name") + llm_provider: str = Field(default="zhipu", description="LLM provider: 'zhipu' or 'qwen'") + + # ZhipuAI (智谱 AI) + zhipu_api_key: str = Field(default="", description="ZhipuAI API Key") + zhipu_model: str = Field(default="glm-4-flash", description="ZhipuAI Model name") + + # Qwen (通义千问) + qwen_api_key: str = Field(default="", description="Qwen/DashScope API Key") + qwen_model: str = Field(default="qwen-omni-turbo", description="Qwen Model name") + + # 通用配置 enable_reasoning_mode: bool = Field(default=False, description="Enable AI reasoning/thinking mode (slower but more thoughtful)") reasoning_mode_for_complex: bool = Field(default=True, description="Enable reasoning mode only for complex queries") diff --git a/agent/core/llm.py b/agent/core/llm.py index 43dc06c..3e25c5f 100644 --- a/agent/core/llm.py +++ b/agent/core/llm.py @@ -1,8 +1,9 @@ """ -ZhipuAI LLM Client for B2B Shopping AI Assistant +LLM Client for B2B Shopping AI Assistant +Supports both ZhipuAI and Qwen (DashScope) """ import concurrent.futures -from typing import Any, Optional +from typing import Any, Optional, Union from dataclasses import dataclass from zhipuai import ZhipuAI @@ -276,12 +277,168 @@ class ZhipuLLMClient: raise -llm_client: Optional[ZhipuLLMClient] = None +llm_client: Optional[Union[ZhipuLLMClient, "QwenLLMClient"]] = None -def get_llm_client() -> ZhipuLLMClient: - """Get or create global LLM client instance""" +def get_llm_client() -> Union[ZhipuLLMClient, "QwenLLMClient"]: + """Get or create global LLM client instance based on provider setting""" global llm_client if llm_client is None: - llm_client = ZhipuLLMClient() + provider = settings.llm_provider.lower() + if provider == "qwen": + llm_client = QwenLLMClient() + else: + llm_client = ZhipuLLMClient() return llm_client + + +# ============ Qwen (DashScope) LLM Client ============ + +try: + from dashscope import Generation + DASHSCOPE_AVAILABLE = True +except ImportError: + DASHSCOPE_AVAILABLE = False + logger.warning("DashScope SDK not installed. Qwen models will not be available.") + + +class QwenLLMClient: + """Qwen (DashScope) LLM Client wrapper""" + + DEFAULT_TIMEOUT = 60 # seconds + + def __init__( + self, + api_key: Optional[str] = None, + model: Optional[str] = None, + timeout: Optional[int] = None + ): + if not DASHSCOPE_AVAILABLE: + raise ImportError("DashScope SDK is not installed. Install it with: pip install dashscope") + + self.api_key = api_key or settings.qwen_api_key + self.model = model or settings.qwen_model + self.timeout = timeout or self.DEFAULT_TIMEOUT + + # 设置 API key 到 DashScope SDK + # 必须直接设置 dashscope.api_key,环境变量可能不够 + import dashscope + dashscope.api_key = self.api_key + + logger.info( + "Qwen client initialized", + model=self.model, + timeout=self.timeout, + api_key_prefix=self.api_key[:10] + "..." if len(self.api_key) > 10 else self.api_key + ) + + async def chat( + self, + messages: list[Message], + temperature: float = 0.7, + max_tokens: int = 2048, + top_p: float = 0.9, + use_cache: bool = True, + **kwargs: Any + ) -> LLMResponse: + """Send chat completion request with caching support""" + formatted_messages = [ + {"role": msg.role, "content": msg.content} + for msg in messages + ] + + # Try cache first + if use_cache: + try: + cache = get_response_cache() + cached_response = await cache.get( + model=self.model, + messages=formatted_messages, + temperature=temperature + ) + if cached_response is not None: + logger.info( + "Returning cached response", + model=self.model, + response_length=len(cached_response) + ) + return LLMResponse( + content=cached_response, + finish_reason="cache_hit", + usage={"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} + ) + except Exception as e: + logger.warning("Cache check failed", error=str(e)) + + logger.info( + "Sending chat request", + model=self.model, + message_count=len(messages), + temperature=temperature + ) + + def _make_request(): + response = Generation.call( + model=self.model, + messages=formatted_messages, + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + result_format="message", + **kwargs + ) + return response + + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(_make_request) + response = future.result(timeout=self.timeout) + + # Qwen API 响应格式 + if response.status_code != 200: + raise Exception(f"Qwen API error: {response.message}") + + content = response.output.choices[0].message.content + finish_reason = response.output.choices[0].finish_reason + usage = response.usage + + logger.info( + "Chat response received", + finish_reason=finish_reason, + content_length=len(content) if content else 0, + usage=usage + ) + + if not content: + logger.warning("LLM returned empty content") + + # Cache the response + if use_cache and content: + try: + cache = get_response_cache() + await cache.set( + model=self.model, + messages=formatted_messages, + response=content, + temperature=temperature + ) + except Exception as e: + logger.warning("Failed to cache response", error=str(e)) + + return LLMResponse( + content=content or "", + finish_reason=finish_reason, + usage={ + "prompt_tokens": usage.input_tokens, + "completion_tokens": usage.output_tokens, + "total_tokens": usage.total_tokens + } + ) + + except concurrent.futures.TimeoutError: + logger.error("Chat request timed out", timeout=self.timeout) + raise TimeoutError(f"Request timed out after {self.timeout} seconds") + + except Exception as e: + logger.error("Chat request failed", error=str(e)) + raise diff --git a/agent/integrations/chatwoot.py b/agent/integrations/chatwoot.py index 9812a43..9b2c588 100644 --- a/agent/integrations/chatwoot.py +++ b/agent/integrations/chatwoot.py @@ -1089,7 +1089,7 @@ class ChatwootClient: products: list[dict[str, Any]], language: str = "en" ) -> dict[str, Any]: - """发送商品搜索结果(使用 cards 格式) + """发送商品搜索结果(使用 product_list 格式) Args: conversation_id: 会话 ID @@ -1100,8 +1100,7 @@ class ChatwootClient: - product_image: 商品图片 URL - price: 价格 - special_price: 特价(可选) - - stock: 库存 - - sales_count: 销量 + - href: 商品链接路径(可选) language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en Returns: @@ -1114,124 +1113,89 @@ class ChatwootClient: ... "product_name": "Product A", ... "product_image": "https://...", ... "price": "99.99", - ... "stock": 100 + ... "href": "/product/detail/12345" ... } ... ] >>> await chatwoot.send_product_cards(123, products, language="zh") """ - # 获取前端 URL + client = await self._get_client() + + # 获取前端域名 frontend_url = settings.frontend_url.rstrip('/') - # 构建商品卡片 - cards = [] + # 构建商品列表 + product_list = [] for product in products: - spu_id = product.get("spu_id", "") - spu_sn = product.get("spu_sn", "") product_name = product.get("product_name", "Unknown Product") product_image = product.get("product_image", "") price = product.get("price", "0") special_price = product.get("special_price") - stock = product.get("stock", 0) - sales_count = product.get("sales_count", 0) + href = product.get("href", "") - # 价格显示(如果有特价则显示特价) + # 价格显示(如果有特价则显示特价,否则显示原价) try: - price_num = float(price) if price else 0 - price_text = f"€{price_num:.2f}" + if special_price and float(special_price) > 0: + price_num = float(special_price) + else: + price_num = float(price) if price else 0 + + # 根据语言选择货币符号 + if language == "zh": + price_text = f"¥{price_num:.2f}" + else: + price_text = f"€{price_num:.2f}" except (ValueError, TypeError): - price_text = str(price) if price else "€0.00" + price_text = str(price) if price else ("¥0.00" if language == "zh" else "€0.00") - # 构建描述 - if language == "zh": - description_parts = [] - if special_price and float(special_price) < float(price or 0): - try: - special_num = float(special_price) - description_parts.append(f"特价: €{special_num:.2f}") - except: - pass - if stock is not None: - description_parts.append(f"库存: {stock}") - if sales_count: - description_parts.append(f"已售: {sales_count}") - description = " | ".join(description_parts) if description_parts else "暂无详细信息" - else: - description_parts = [] - if special_price and float(special_price) < float(price or 0): - try: - special_num = float(special_price) - description_parts.append(f"Special: €{special_num:.2f}") - except: - pass - if stock is not None: - description_parts.append(f"Stock: {stock}") - if sales_count: - description_parts.append(f"Sold: {sales_count}") - description = " | ".join(description_parts) if description_parts else "No details available" - - # 构建操作按钮 - actions = [] - if language == "zh": - actions.append({ - "type": "link", - "text": "查看详情", - "uri": f"{frontend_url}/product/detail?spuId={spu_id}" - }) - if stock and stock > 0: - actions.append({ - "type": "link", - "text": "立即购买", - "uri": f"{frontend_url}/product/detail?spuId={spu_id}" - }) - else: - actions.append({ - "type": "link", - "text": "View Details", - "uri": f"{frontend_url}/product/detail?spuId={spu_id}" - }) - if stock and stock > 0: - actions.append({ - "type": "link", - "text": "Buy Now", - "uri": f"{frontend_url}/product/detail?spuId={spu_id}" - }) - - # 构建卡片 - card = { - "title": product_name, - "description": description, - "media_url": product_image, - "actions": actions + # 构建商品对象 + product_obj = { + "image": product_image, + "name": product_name, + "price": price_text, + "target": "_blank" } - cards.append(card) + # 如果有 href,添加完整 URL + if href: + product_obj["url"] = f"{frontend_url}{href}" + else: + # 如果没有 href,使用 spu_id 构建默认链接 + spu_id = product.get("spu_id", "") + if spu_id: + product_obj["url"] = f"{frontend_url}/product/detail?spuId={spu_id}" - # 发送 cards 类型消息 - client = await self._get_client() + product_list.append(product_obj) + # 构建标题 + if language == "zh": + title = "找到以下商品" + else: + title = "Found following products" + + # 构建 content_attributes content_attributes = { - "items": cards + "title": title, + "products": product_list, + "actions": [] } - # 添加标题 - if language == "zh": - content = f"找到 {len(products)} 个商品" - else: - content = f"Found {len(products)} products" - + # 发送 product_list 类型消息 payload = { - "content": content, - "content_type": "cards", + "content": "", + "content_type": "product_list", + "message_type": 1, "content_attributes": content_attributes } + # 输出完整的 payload 用于调试 + import json as json_module logger.info( - "Sending product cards", + "Sending product list to Chatwoot", conversation_id=conversation_id, - products_count=len(products), + products_count=len(product_list), language=language, - payload_preview=str(payload)[:1000] + payload=json_module.dumps(payload, ensure_ascii=False, indent=2) ) response = await client.post( diff --git a/agent/prompts/product/en.yaml b/agent/prompts/product/en.yaml index c016fa4..090fded 100644 --- a/agent/prompts/product/en.yaml +++ b/agent/prompts/product/en.yaml @@ -12,10 +12,16 @@ system_prompt: | ## Available Tools - **search_products** - Search for products - - query: Search keywords - - category: Product category (optional) - - filters: {attribute: value} (optional) + **search_products** - Search for products by keyword + - keyword: Search keywords (required) + - page_size: Number of results (optional, default: 6) + - page: Page number (optional, default: 1) + + **recommend_products** - Get personalized product recommendations (Love List/心动清单) + - page_size: Number of recommendations (optional, default: 6) + - page: Page number (optional, default: 1) + - warehouse_id: Warehouse ID (optional, default: 2) + - Note: Requires user authentication token, uses Mall API /mall/api/loveList **get_product_details** - Get detailed product information - product_id: Product ID or SKU @@ -28,18 +34,22 @@ system_prompt: | - product_id: Product ID - quantity: Quantity for pricing (optional, for tiered pricing) - **recommend_products** - Get product recommendations - - category: Product category - - limit: Number of recommendations - ## Important Rules 1. **Product Recognition**: - - Product search/产品搜索/找产品/商品 → Use search_products + - 泛泛推荐(推荐/推荐商品/猜你喜欢/心动清单,无具体关键词) → Use recommend_products + - 具体商品推荐(推荐ring/推荐手机,有具体关键词) → Use search_products + - Product search/产品搜索/找商品/搜索商品 + 具体关键词 → Use search_products - Price/价格/报价/多少钱 → Use get_pricing - Stock/库存/有没有货/现货 → Use check_stock - Product details/产品详情/产品信息/产品规格 → Use get_product_details - - Recommendation/推荐/推荐产品 → Use recommend_products + + 2. **Recommendation vs Search**: + - "推荐一些商品"、"推荐一下"、"有什么好推荐的"(无具体关键词) → Use recommend_products + - "推荐ring相关的商品"、"推荐手机"、"推荐一些珠宝"(有具体关键词) → Use search_products (keyword: "ring"/"手机"/"珠宝") + - "搜索ring"、"找ring商品" → Use search_products (keyword: "ring") + + **规则**: 如果推荐请求中包含具体的商品关键词(如 ring、手机、珠宝等),使用 search_products 进行精准搜索。只有在泛泛请求推荐时才使用 recommend_products。 2. For B2B customers, prioritize wholesale/bulk pricing information 3. Always check stock availability before suggesting purchases @@ -51,17 +61,24 @@ system_prompt: | - For Chinese inquiries, respond in Chinese - For English inquiries, respond in English + ## ⚠️ CRITICAL: Response Format + + **You MUST ALWAYS respond with valid JSON. Never respond with plain text.** + ## Tool Call Format + When you need to use a tool, respond with EXACTLY this JSON format: ```json { "action": "call_tool", - "tool_name": "tool_name", - "arguments": {"parameter": "value"} + "tool_name": "search_products", + "arguments": { + "query": "product name" + } } ``` - Or to respond directly: + When you can answer directly: ```json { "action": "respond", @@ -69,6 +86,14 @@ system_prompt: | } ``` + **IMPORTANT:** + - Your entire response must be a valid JSON object + - Do NOT include any text outside the JSON + - Do NOT use markdown code blocks + - Always include "action" field + - "action" must be either "call_tool" or "respond" + - For "call_tool", you must include "tool_name" and "arguments" + tool_descriptions: search_products: "Search for products by keywords or category" get_product_details: "Get detailed product information" diff --git a/agent/requirements.txt b/agent/requirements.txt index a202b4e..d201156 100644 --- a/agent/requirements.txt +++ b/agent/requirements.txt @@ -9,6 +9,7 @@ langchain-core>=0.1.0 # AI Model SDK zhipuai>=2.0.0 +dashscope>=1.14.0 # Async utilities sniffio>=1.3.0 anyio>=4.0.0 diff --git a/agent/webhooks/chatwoot_webhook.py b/agent/webhooks/chatwoot_webhook.py index e0de0f9..b22f3f0 100644 --- a/agent/webhooks/chatwoot_webhook.py +++ b/agent/webhooks/chatwoot_webhook.py @@ -6,6 +6,7 @@ import hashlib from typing import Any, Optional from fastapi import APIRouter, Request, HTTPException, BackgroundTasks +from fastapi.responses import JSONResponse from pydantic import BaseModel from config import settings @@ -75,6 +76,7 @@ class ChatwootWebhookPayload(BaseModel): message_type: Optional[str] = None content_type: Optional[str] = None private: Optional[bool] = False + content_attributes: Optional[dict] = None # 图片搜索 URL 等额外属性 conversation: Optional[WebhookConversation] = None sender: Optional[WebhookSender] = None contact: Optional[WebhookContact] = None @@ -132,7 +134,15 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: conversation_id = str(conversation.id) content = payload.content - if not content: + # 检查是否是图片搜索消息(在过滤空 content 之前) + # 图片搜索消息的 content 通常是空的,但需要处理 + is_image_search = ( + payload.content_type == "search_image" or + payload.content_type == 17 + ) + + # 只有非图片搜索消息才检查 content 是否为空 + if not content and not is_image_search: logger.debug("Empty message content, skipping") return @@ -282,19 +292,7 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: channel=conversation.channel if conversation else None ) - # 识别消息渠道(邮件、网站等) - message_channel = conversation.channel if conversation else "Channel" - is_email = message_channel == "Email" - - # 邮件渠道特殊处理 - if is_email: - logger.info( - "Email channel detected", - conversation_id=conversation_id, - sender_email=contact.email if contact else None - ) - - # Load conversation context from cache + # Load conversation context from cache (需要在图片搜索检测之前) cache = get_cache_manager() await cache.connect() @@ -307,10 +305,125 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token: if mall_token: context["mall_token"] = mall_token + # 图片 URL 可能在两个位置: + # 1. payload.content_attributes.url (顶层) + # 2. payload.conversation.messages[0].content_attributes.url (嵌套) + image_url = None + + # 首先尝试从顶层获取 + if payload.content_attributes: + image_url = payload.content_attributes.get("url") + + # 如果顶层没有,尝试从 conversation.messages[0] 获取 + if not image_url and conversation and hasattr(conversation, 'model_dump'): + conv_dict = conversation.model_dump() + messages = conv_dict.get('messages', []) + if messages and len(messages) > 0: + first_message = messages[0] + msg_content_attrs = first_message.get('content_attributes', {}) + image_url = msg_content_attrs.get('url') + + if is_image_search and image_url: + logger.info( + "Image search detected", + conversation_id=conversation_id, + image_url=image_url[:100] + "..." if len(image_url) > 100 else image_url + ) + + # 创建 Chatwoot client + from integrations.chatwoot import ChatwootClient + chatwoot = ChatwootClient(account_id=int(account_id)) + + # 开启 typing status + try: + await chatwoot.toggle_typing_status( + conversation_id=conversation.id, + typing_status="on" + ) + except Exception as e: + logger.warning( + "Failed to enable typing status for image search", + conversation_id=conversation_id, + error=str(e) + ) + + # 直接调用图片搜索工具 + # 修改 content 为图片搜索指令 + search_content = f"[图片搜索] 请根据这张图片搜索相似的商品" + + # 使用特殊标记让 Product Agent 知道这是图片搜索 + context["image_search_url"] = image_url + + final_state = await process_message( + conversation_id=conversation_id, + user_id=user_id, + account_id=account_id, + message=search_content, + history=history, + context=context, + user_token=user_token, + mall_token=mall_token + ) + + # 获取响应并发送 + response = final_state.get("response", "") + + if response: + await chatwoot.send_message( + conversation_id=conversation.id, + content=response + ) + + logger.info( + "Image search response sent", + conversation_id=conversation_id, + response_length=len(response) + ) + + # 关闭 typing status + await chatwoot.toggle_typing_status( + conversation_id=conversation.id, + typing_status="off" + ) + + await chatwoot.close() + + # Update cache + await cache.add_message(conversation_id, "user", search_content) + await cache.add_message(conversation_id, "assistant", response) + + # Save context + new_context = final_state.get("context", {}) + new_context["last_intent"] = final_state.get("intent") + await cache.set_context(conversation_id, new_context) + + logger.info( + "Image search message processed successfully", + conversation_id=conversation_id, + intent=final_state.get("intent") + ) + + return JSONResponse( + content={"status": "success"}, + status_code=200 + ) + + # 识别消息渠道(邮件、网站等) + message_channel = conversation.channel if conversation else "Channel" + is_email = message_channel == "Email" + # 添加渠道信息到 context(让 Agent 知道是邮件还是网站) context["channel"] = message_channel context["is_email"] = is_email + # 邮件渠道特殊处理 + if is_email: + logger.info( + "Email channel detected", + conversation_id=conversation_id, + sender_email=contact.email if contact else None + ) + # 创建 Chatwoot client(提前创建以便开启 typing status) from integrations.chatwoot import ChatwootClient chatwoot = ChatwootClient(account_id=int(account_id)) diff --git a/docker-compose.yml b/docker-compose.yml index 22413c6..aa8ee58 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,8 +29,11 @@ services: container_name: ai_agent environment: # AI Model + LLM_PROVIDER: ${LLM_PROVIDER:-zhipu} ZHIPU_API_KEY: ${ZHIPU_API_KEY} - ZHIPU_MODEL: ${ZHIPU_MODEL:-glm-4} + ZHIPU_MODEL: ${ZHIPU_MODEL:-glm-4-flash} + QWEN_API_KEY: ${QWEN_API_KEY} + QWEN_MODEL: ${QWEN_MODEL:-qwen-omni-turbo} # Redis REDIS_HOST: redis REDIS_PORT: 6379 diff --git a/mcp_servers/product_mcp/server.py b/mcp_servers/product_mcp/server.py index a3a5478..1cc4808 100644 --- a/mcp_servers/product_mcp/server.py +++ b/mcp_servers/product_mcp/server.py @@ -82,50 +82,118 @@ async def get_product_detail( @register_tool("recommend_products") @mcp.tool() async def recommend_products( - user_id: str, - account_id: str, - context: Optional[dict] = None, - strategy: str = "hybrid", - limit: int = 10 + user_token: str, + page: int = 1, + page_size: int = 6, + warehouse_id: int = 2 ) -> dict: - """Get personalized product recommendations - + """Get recommended products from Mall API Love List + + 从 Mall API 获取推荐商品列表(心动清单/猜你喜欢) + Args: - user_id: User identifier - account_id: B2B account identifier - context: Optional context for recommendations: - - current_query: Current search query - - recent_views: List of recently viewed product IDs - - cart_items: Items in cart - strategy: Recommendation strategy (collaborative, content_based, hybrid) - limit: Maximum recommendations to return (default: 10) - + user_token: User JWT token for authentication + page: Page number (default: 1) + page_size: Number of products per page (default: 6, max 100) + warehouse_id: Warehouse ID (default: 2) + Returns: - List of recommended products with reasons + List of recommended products with product details """ - payload = { - "user_id": user_id, - "account_id": account_id, - "strategy": strategy, - "limit": limit - } - - if context: - payload["context"] = context - try: - result = await hyperf.post("/products/recommend", json=payload) - + from shared.mall_client import MallClient + import logging + logger = logging.getLogger(__name__) + + logger.info(f"recommend_products called: page={page}, page_size={page_size}, warehouse_id={warehouse_id}") + + # 创建 Mall 客户端(使用 user_token) + mall = MallClient( + api_url=settings.mall_api_url, + api_token=user_token, # 使用用户的 token + tenant_id=settings.mall_tenant_id, + currency_code=settings.mall_currency_code, + language_id=settings.mall_language_id, + source=settings.mall_source + ) + + result = await mall.get_love_list( + page=page, + page_size=page_size, + warehouse_id=warehouse_id + ) + + logger.info( + f"Mall API love list returned: result_type={type(result).__name__}, " + f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, " + f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}" + ) + + # 解析返回结果 + # Mall API 实际返回结构: {"total": X, "data": {"data": [...]}} + if isinstance(result, dict) and "data" in result: + data = result.get("data", {}) + if isinstance(data, dict) and "data" in data: + products_list = data.get("data", []) + else: + products_list = [] + total = result.get("total", 0) + elif isinstance(result, dict) and "list" in result: + # 兼容可能的 list 结构 + products_list = result.get("list", []) + total = result.get("total", 0) + else: + products_list = [] + total = 0 + + logger.info(f"Extracted {len(products_list)} products from love list, total={total}") + + # 格式化商品数据(与 search_products 格式一致) + formatted_products = [] + for product in products_list: + formatted_products.append({ + "spu_id": product.get("spuId"), + "spu_sn": product.get("spuSn"), + "product_name": product.get("spuName"), + "product_image": product.get("masterImage"), + "price": product.get("price"), + "special_price": product.get("specialPrice"), + "stock": product.get("stockDescribe"), + "sales_count": product.get("salesCount", 0), + # 额外有用字段 + "href": product.get("href"), + "spu_type": product.get("spuType"), + "spu_type_name": product.get("spuTypeName"), + "min_price": product.get("minPrice"), + "max_price": product.get("maxPrice"), + "price_with_currency": product.get("priceWithCurrency"), + "mark_price": product.get("markPrice"), + "skus_count": len(product.get("skus", [])) + }) + + logger.info(f"Formatted {len(formatted_products)} products for recommendation") + return { "success": True, - "recommendations": result.get("recommendations", []) + "products": formatted_products, + "total": total, + "keyword": "recommendation" } + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.error(f"recommend_products failed: {str(e)}", exc_info=True) return { "success": False, "error": str(e), - "recommendations": [] + "products": [], + "total": 0 } + finally: + # 关闭客户端 + if 'mall' in dir(): + await mall.close() @register_tool("get_quote") @@ -249,7 +317,7 @@ async def get_categories() -> dict: @mcp.tool() async def search_products( keyword: str, - page_size: int = 5, + page_size: int = 6, page: int = 1 ) -> dict: """Search products from Mall API @@ -258,7 +326,7 @@ async def search_products( Args: keyword: 搜索关键词(商品名称、编号等) - page_size: 每页数量 (default: 5, max 100) + page_size: 每页数量 (default: 6, max 100) page: 页码 (default: 1) Returns: @@ -299,6 +367,18 @@ async def search_products( ) print(f"[DEBUG] Mall API returned: total={result.get('total', 'N/A')}, data_keys={list(result.get('data', {}).keys()) if isinstance(result.get('data'), dict) else 'N/A'}") + # 详细输出商品数据 + if "data" in result and isinstance(result["data"], dict): + products = result["data"].get("data", []) + print(f"[DEBUG] Found {len(products)} products in response") + for i, p in enumerate(products[:3]): # 只打印前3个 + print(f"[DEBUG] Product {i+1}: spuId={p.get('spuId')}, spuName={p.get('spuName')}, price={p.get('price')}") + else: + products = result.get("list", []) + print(f"[DEBUG] Found {len(products)} products in list") + total = result.get("total", 0) + print(f"[DEBUG] Total products: {total}") + # 解析返回结果 # Mall API 返回结构: {"total": X, "data": {"data": [...], ...}} if "data" in result and isinstance(result["data"], dict): @@ -349,6 +429,116 @@ async def search_products( await mall.close() +@register_tool("search_products_by_image") +@mcp.tool() +async def search_products_by_image( + image_url: str, + page_size: int = 6, + page: int = 1 +) -> dict: + """Search products by image from Mall API + + 从 Mall API 根据图片搜索商品 SPU(以图搜图) + + Args: + image_url: 图片 URL(需要 URL 编码) + page_size: 每页数量 (default: 6, max 100) + page: 页码 (default: 1) + + Returns: + 商品列表,包含 SPU 信息、商品图片、价格等 + Product list including SPU ID, name, image, price, etc. + """ + try: + from shared.mall_client import MallClient + import logging + logger = logging.getLogger(__name__) + + logger.info(f"search_products_by_image called: image_url={image_url}") + + # 创建 Mall 客户端(不需要 token) + mall = MallClient( + api_url=settings.mall_api_url, + api_token=None, # 图片搜索不需要 token + tenant_id=settings.mall_tenant_id, + currency_code=settings.mall_currency_code, + language_id=settings.mall_language_id, + source=settings.mall_source + ) + + # 调用 Mall API 图片搜索接口 + # 注意:httpx 会自动对 params 进行 URL 编码,不需要手动编码 + result = await mall.get( + "/mall/api/spu", + params={ + "pageSize": min(page_size, 100), + "page": page, + "searchImageUrl": image_url + } + ) + + logger.info( + f"Mall API image search returned: result_type={type(result).__name__}, " + f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, " + f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}" + ) + + # 解析返回结果 + # Mall API 返回结构: {"total": X, "data": {"data": [...]}} + if "data" in result and isinstance(result["data"], dict): + products = result["data"].get("data", []) + else: + products = result.get("list", []) + total = result.get("total", 0) + + # 格式化商品数据(与 search_products 格式一致) + formatted_products = [] + for product in products: + formatted_products.append({ + "spu_id": product.get("spuId"), + "spu_sn": product.get("spuSn"), + "product_name": product.get("spuName"), + "product_image": product.get("masterImage"), + "price": product.get("price"), + "special_price": product.get("specialPrice"), + "stock": product.get("stockDescribe"), + "sales_count": product.get("salesCount", 0), + # 额外有用字段 + "href": product.get("href"), + "spu_type": product.get("spuType"), + "spu_type_name": product.get("spuTypeName"), + "min_price": product.get("minPrice"), + "max_price": product.get("maxPrice"), + "price_with_currency": product.get("priceWithCurrency"), + "mark_price": product.get("markPrice"), + "skus_count": len(product.get("skus", [])) + }) + + logger.info(f"Formatted {len(formatted_products)} products from image search") + + return { + "success": True, + "products": formatted_products, + "total": total, + "keyword": "image_search", + "image_url": image_url + } + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.error(f"search_products_by_image failed: {str(e)}", exc_info=True) + return { + "success": False, + "error": str(e), + "products": [], + "total": 0 + } + finally: + # 关闭客户端 + if 'mall' in dir(): + await mall.close() + + # Health check endpoint @register_tool("health_check") @mcp.tool() diff --git a/mcp_servers/shared/mall_client.py b/mcp_servers/shared/mall_client.py index 1c519a1..944d1c8 100644 --- a/mcp_servers/shared/mall_client.py +++ b/mcp_servers/shared/mall_client.py @@ -316,6 +316,44 @@ class MallClient: except Exception as e: raise Exception(f"搜索商品失败 (Search SPU products failed): {str(e)}") + async def get_love_list( + self, + page: int = 1, + page_size: int = 6, + warehouse_id: int = 2 + ) -> dict[str, Any]: + """Get recommended products (Love List) from Mall API + + 获取推荐商品列表 + + Args: + page: Page number (default: 1) + page_size: Number of products per page (default: 6) + warehouse_id: Warehouse ID (default: 2) + + Returns: + Dictionary containing product list and metadata + + Example: + >>> client = MallClient() + >>> result = await client.get_love_list(page=1, page_size=6, warehouse_id=2) + >>> print(f"找到 {len(result.get('list', []))} 个推荐商品") + """ + try: + params = { + "page": page, + "pageSize": page_size, + "warehouseId": warehouse_id + } + + result = await self.get( + "/mall/api/loveList", + params=params + ) + return result + except Exception as e: + raise Exception(f"获取推荐商品失败 (Get love list failed): {str(e)}") + # Global Mall client instance mall_client: Optional[MallClient] = None