Compare commits
2 Commits
754804219f
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
432a789198 | ||
|
|
965b11316e |
16
.env.example
16
.env.example
@@ -1,8 +1,22 @@
|
||||
# B2B Shopping AI Assistant Platform - Environment Variables
|
||||
|
||||
# ============ AI Model ============
|
||||
# LLM Provider: 'zhipu' (default) or 'qwen'
|
||||
LLM_PROVIDER=zhipu
|
||||
|
||||
# ZhipuAI (智谱 AI)
|
||||
ZHIPU_API_KEY=your_zhipu_api_key
|
||||
ZHIPU_MODEL=glm-4
|
||||
ZHIPU_MODEL=glm-4-flash
|
||||
|
||||
# Qwen (通义千问) - Alternative provider
|
||||
QWEN_API_KEY=your_qwen_dashscope_api_key
|
||||
QWEN_MODEL=qwen-omni-turbo
|
||||
# Available Qwen models:
|
||||
# - qwen-omni-turbo (多模态,支持图片/语音)
|
||||
# - qwen-plus (通用模型)
|
||||
# - qwen-turbo (高速模型)
|
||||
# - qwen-long (长文本)
|
||||
# - qwen-max (最强模型)
|
||||
|
||||
# ============ Redis ============
|
||||
REDIS_HOST=redis
|
||||
|
||||
@@ -322,16 +322,51 @@ async def customer_service_agent(state: AgentState) -> AgentState:
|
||||
return state
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
# JSON parsing failed
|
||||
# JSON parsing failed - try alternative format: "tool_name\n{args}"
|
||||
logger.error(
|
||||
"Failed to parse LLM response as JSON",
|
||||
error=str(e),
|
||||
raw_content=content[:500],
|
||||
conversation_id=state["conversation_id"]
|
||||
)
|
||||
# Don't use raw content as response - use fallback instead
|
||||
state = set_response(state, "抱歉,我无法理解您的请求。请尝试重新表述或联系人工客服。")
|
||||
return state
|
||||
|
||||
# Handle non-JSON format: "tool_name\n{args}"
|
||||
if '\n' in content and not content.startswith('{'):
|
||||
lines = content.split('\n', 1)
|
||||
tool_name = lines[0].strip()
|
||||
args_json = lines[1].strip() if len(lines) > 1 else '{}'
|
||||
|
||||
try:
|
||||
arguments = json.loads(args_json) if args_json else {}
|
||||
logger.info(
|
||||
"Customer service agent calling tool (alternative format)",
|
||||
tool_name=tool_name,
|
||||
arguments=arguments,
|
||||
conversation_id=state["conversation_id"]
|
||||
)
|
||||
|
||||
state = add_tool_call(
|
||||
state,
|
||||
tool_name=tool_name,
|
||||
arguments=arguments,
|
||||
server="strapi"
|
||||
)
|
||||
state["state"] = ConversationState.TOOL_CALLING.value
|
||||
return state
|
||||
except json.JSONDecodeError:
|
||||
# Args parsing also failed
|
||||
logger.warning(
|
||||
"Failed to parse tool arguments",
|
||||
tool_name=tool_name,
|
||||
args_json=args_json[:200],
|
||||
conversation_id=state["conversation_id"]
|
||||
)
|
||||
state = set_response(state, "抱歉,我无法理解您的请求。请尝试重新表述或联系人工客服。")
|
||||
return state
|
||||
else:
|
||||
# Not a recognized format
|
||||
state = set_response(state, "抱歉,我无法理解您的请求。请尝试重新表述或联系人工客服。")
|
||||
return state
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Customer service agent failed", error=str(e), exc_info=True)
|
||||
@@ -342,11 +377,59 @@ async def customer_service_agent(state: AgentState) -> AgentState:
|
||||
async def _generate_response_from_results(state: AgentState) -> AgentState:
|
||||
"""Generate response based on tool results"""
|
||||
|
||||
# Build context from tool results
|
||||
# Build context from tool results - extract only essential info to reduce prompt size
|
||||
tool_context = []
|
||||
for result in state["tool_results"]:
|
||||
if result["success"]:
|
||||
tool_context.append(f"Tool {result['tool_name']} returned:\n{json.dumps(result['data'], ensure_ascii=False, indent=2)}")
|
||||
tool_name = result['tool_name']
|
||||
data = result['data']
|
||||
|
||||
# Extract only essential information based on tool type
|
||||
if tool_name == "get_company_info":
|
||||
# Extract key contact info only
|
||||
contact = data.get('contact', {})
|
||||
emails = contact.get('email', [])
|
||||
if isinstance(emails, list) and emails:
|
||||
email_str = ", ".join(emails[:3]) # Max 3 emails
|
||||
else:
|
||||
email_str = str(emails) if emails else "N/A"
|
||||
|
||||
phones = contact.get('phone', [])
|
||||
if isinstance(phones, list) and phones:
|
||||
phone_str = ", ".join(phones[:2]) # Max 2 phones
|
||||
else:
|
||||
phone_str = str(phones) if phones else "N/A"
|
||||
|
||||
address = contact.get('address', {})
|
||||
address_str = f"{address.get('city', '')}, {address.get('country', '')}".strip(', ')
|
||||
|
||||
summary = f"Contact Information: Emails: {email_str} | Phones: {phone_str} | Address: {address_str} | Working hours: {contact.get('working_hours', 'N/A')}"
|
||||
tool_context.append(summary)
|
||||
|
||||
elif tool_name == "query_faq" or tool_name == "search_knowledge_base":
|
||||
# Extract FAQ items summary
|
||||
faqs = data.get('faqs', []) if isinstance(data, dict) else []
|
||||
if faqs:
|
||||
faq_summaries = [f"- Q: {faq.get('question', '')[:50]}... A: {faq.get('answer', '')[:50]}..." for faq in faqs[:3]]
|
||||
summary = f"Found {len(faqs)} FAQ items:\n" + "\n".join(faq_summaries)
|
||||
tool_context.append(summary)
|
||||
else:
|
||||
tool_context.append("No FAQ items found")
|
||||
|
||||
elif tool_name == "get_categories":
|
||||
# Extract category names only
|
||||
categories = data.get('categories', []) if isinstance(data, dict) else []
|
||||
category_names = [cat.get('name', '') for cat in categories[:5] if cat.get('name')]
|
||||
summary = f"Available categories: {', '.join(category_names)}"
|
||||
if len(categories) > 5:
|
||||
summary += f" (and {len(categories) - 5} more)"
|
||||
tool_context.append(summary)
|
||||
|
||||
else:
|
||||
# For other tools, include concise summary (limit to 200 chars)
|
||||
data_str = json.dumps(data, ensure_ascii=False)[:200]
|
||||
tool_context.append(f"Tool {tool_name} returned: {data_str}...")
|
||||
|
||||
else:
|
||||
tool_context.append(f"Tool {result['tool_name']} failed: {result['error']}")
|
||||
|
||||
@@ -357,7 +440,8 @@ User question: {state["current_message"]}
|
||||
Tool returned information:
|
||||
{chr(10).join(tool_context)}
|
||||
|
||||
Please generate a friendly and professional response. If the tool did not return useful information, honestly inform the user and suggest other ways to get help.
|
||||
Please generate a friendly and professional response in Chinese. Keep it concise but informative.
|
||||
If the tool did not return useful information, honestly inform the user and suggest other ways to get help.
|
||||
Return only the response content, do not return JSON."""
|
||||
|
||||
messages = [
|
||||
@@ -367,11 +451,12 @@ Return only the response content, do not return JSON."""
|
||||
|
||||
try:
|
||||
llm = get_llm_client()
|
||||
response = await llm.chat(messages, temperature=0.7)
|
||||
# Lower temperature for faster response
|
||||
response = await llm.chat(messages, temperature=0.3)
|
||||
state = set_response(state, response.content)
|
||||
return state
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Response generation failed", error=str(e))
|
||||
state = set_response(state, "Sorry, there was a problem processing your request. Please try again later or contact customer support.")
|
||||
state = set_response(state, "抱歉,处理您的请求时出现问题。请稍后重试或联系人工客服。")
|
||||
return state
|
||||
|
||||
@@ -30,9 +30,11 @@ PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。
|
||||
2. **get_product_detail** - 获取商品详情
|
||||
- product_id: 商品ID
|
||||
|
||||
3. **recommend_products** - 智能推荐
|
||||
- context: 推荐上下文(可包含当前查询、浏览历史等)
|
||||
- limit: 推荐数量
|
||||
3. **recommend_products** - 智能推荐(心动清单/猜你喜欢)
|
||||
- page_size: 推荐数量(默认 6,最大 100)
|
||||
- page: 页码(默认 1)
|
||||
- warehouse_id: 仓库ID(默认 2)
|
||||
- 说明:此工具使用 Mall API /mall/api/loveList 接口,需要用户 token 认证,系统会自动注入用户 token
|
||||
|
||||
4. **get_quote** - B2B 询价
|
||||
- product_id: 商品ID
|
||||
@@ -81,6 +83,18 @@ PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。
|
||||
}
|
||||
```
|
||||
|
||||
用户说:"推荐一些商品"
|
||||
返回:
|
||||
```json
|
||||
{
|
||||
"action": "call_tool",
|
||||
"tool_name": "recommend_products",
|
||||
"arguments": {
|
||||
"page_size": 6
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
当需要向用户询问更多信息时:
|
||||
```json
|
||||
{
|
||||
@@ -104,6 +118,17 @@ PRODUCT_AGENT_PROMPT = """你是一个专业的 B2B 商品顾问助手。
|
||||
- 报价通常有有效期
|
||||
|
||||
## 商品推荐策略
|
||||
|
||||
**重要规则:推荐 vs 搜索**
|
||||
- **泛泛推荐**("推荐一些商品"、"推荐一下"、"有什么好推荐的") → 使用 recommend_products
|
||||
- **具体商品推荐**("推荐ring相关的商品"、"推荐手机"、"推荐一些珠宝") → 使用 search_products (提取关键词:ring、手机、珠宝)
|
||||
- **商品搜索**("搜索ring"、"找ring商品") → 使用 search_products
|
||||
|
||||
**说明**:
|
||||
- 如果用户推荐请求中包含具体的商品关键词(如 ring、手机、珠宝等),使用 search_products 进行精准搜索
|
||||
- 只有在泛泛请求推荐时才使用 recommend_products(基于用户行为的个性化推荐)
|
||||
|
||||
**其他推荐依据**:
|
||||
- 根据用户采购历史推荐
|
||||
- 根据当前查询语义推荐
|
||||
- 根据企业行业特点推荐
|
||||
@@ -138,6 +163,35 @@ async def product_agent(state: AgentState) -> AgentState:
|
||||
state["agent_history"].append("product")
|
||||
state["state"] = ConversationState.PROCESSING.value
|
||||
|
||||
# ========== FAST PATH: Image Search ==========
|
||||
# Check if this is an image search request
|
||||
image_search_url = state.get("context", {}).get("image_search_url")
|
||||
if image_search_url:
|
||||
logger.info(
|
||||
"Image search detected, calling search_products_by_image",
|
||||
conversation_id=state["conversation_id"],
|
||||
image_url=image_search_url[:100] + "..." if len(image_search_url) > 100 else image_search_url
|
||||
)
|
||||
|
||||
# 直接调用图片搜索工具
|
||||
state = add_tool_call(
|
||||
state,
|
||||
tool_name="search_products_by_image",
|
||||
arguments={
|
||||
"image_url": image_search_url,
|
||||
"page_size": 6,
|
||||
"page": 1
|
||||
},
|
||||
server="product"
|
||||
)
|
||||
|
||||
# 清除 image_search_url 防止无限循环
|
||||
state["context"]["image_search_url"] = None
|
||||
|
||||
state["state"] = ConversationState.TOOL_CALLING.value
|
||||
return state
|
||||
# ==============================================
|
||||
|
||||
# Check if we have tool results to process
|
||||
if state["tool_results"]:
|
||||
return await _generate_product_response(state)
|
||||
@@ -148,7 +202,8 @@ async def product_agent(state: AgentState) -> AgentState:
|
||||
]
|
||||
|
||||
# Add conversation history
|
||||
for msg in state["messages"][-6:]:
|
||||
# 只保留最近 2 条历史消息以减少 token 数量和响应时间
|
||||
for msg in state["messages"][-2:]:
|
||||
messages.append(Message(role=msg["role"], content=msg["content"]))
|
||||
|
||||
# Build context info
|
||||
@@ -233,7 +288,7 @@ async def product_agent(state: AgentState) -> AgentState:
|
||||
|
||||
# Set default page_size if not provided
|
||||
if "page_size" not in arguments:
|
||||
arguments["page_size"] = 5
|
||||
arguments["page_size"] = 6
|
||||
|
||||
# Set default page if not provided
|
||||
if "page" not in arguments:
|
||||
@@ -249,8 +304,21 @@ async def product_agent(state: AgentState) -> AgentState:
|
||||
|
||||
# Inject context for recommendation
|
||||
if tool_name == "recommend_products":
|
||||
arguments["user_id"] = state["user_id"]
|
||||
arguments["account_id"] = state["account_id"]
|
||||
arguments["user_token"] = state.get("user_token")
|
||||
# 如果没有提供 page_size,使用默认值 6
|
||||
if "page_size" not in arguments:
|
||||
arguments["page_size"] = 6
|
||||
# 如果没有提供 warehouse_id,使用默认值 2
|
||||
if "warehouse_id" not in arguments:
|
||||
arguments["warehouse_id"] = 2
|
||||
|
||||
logger.info(
|
||||
"Product agent recommend_products after injection",
|
||||
user_token_present="user_token" in arguments,
|
||||
user_token_preview=arguments.get("user_token", "")[:20] + "..." if arguments.get("user_token") else None,
|
||||
arguments=arguments,
|
||||
conversation_id=state["conversation_id"]
|
||||
)
|
||||
|
||||
# Inject context for quote
|
||||
if tool_name == "get_quote":
|
||||
@@ -301,25 +369,55 @@ async def product_agent(state: AgentState) -> AgentState:
|
||||
async def _generate_product_response(state: AgentState) -> AgentState:
|
||||
"""Generate response based on product tool results"""
|
||||
|
||||
# 特殊处理:如果是 search_products 工具返回,直接发送商品卡片
|
||||
has_product_search_result = False
|
||||
# 特殊处理:如果是 search_products、recommend_products 或 search_products_by_image 工具返回,直接发送商品卡片
|
||||
has_product_result = False
|
||||
products = []
|
||||
result_source = None # "search", "recommend" 或 "image_search"
|
||||
|
||||
# 添加日志:查看所有工具结果
|
||||
import json as json_module
|
||||
logger.info(
|
||||
"All tool results",
|
||||
tool_results_count=len(state.get("tool_results", [])),
|
||||
tool_results=json_module.dumps(state.get("tool_results", []), ensure_ascii=False, indent=2)[:2000]
|
||||
)
|
||||
|
||||
for result in state["tool_results"]:
|
||||
if result["success"] and result["tool_name"] == "search_products":
|
||||
logger.info(
|
||||
"Processing tool result",
|
||||
tool_name=result["tool_name"],
|
||||
success=result["success"],
|
||||
data_keys=list(result.get("data", {}).keys()) if isinstance(result.get("data"), dict) else "not a dict",
|
||||
data_preview=json_module.dumps(result.get("data"), ensure_ascii=False)[:500]
|
||||
)
|
||||
|
||||
if result["success"] and result["tool_name"] in ["search_products", "recommend_products", "search_products_by_image"]:
|
||||
data = result["data"]
|
||||
if isinstance(data, dict) and data.get("success"):
|
||||
products = data.get("products", [])
|
||||
has_product_search_result = True
|
||||
# MCP 返回的数据结构: {"success": true, "result": {"success": true, "products": [...]}}
|
||||
# 需要从 result.result 中提取实际数据
|
||||
inner_data = data.get("result", data)
|
||||
products = inner_data.get("products", [])
|
||||
keyword = inner_data.get("keyword", "")
|
||||
|
||||
has_product_result = True
|
||||
if result["tool_name"] == "recommend_products":
|
||||
result_source = "recommend"
|
||||
elif result["tool_name"] == "search_products_by_image":
|
||||
result_source = "image_search"
|
||||
else:
|
||||
result_source = "search"
|
||||
|
||||
logger.info(
|
||||
"Product search results found",
|
||||
f"Product {result_source} results found",
|
||||
products_count=len(products),
|
||||
keyword=data.get("keyword", "")
|
||||
keyword=keyword,
|
||||
products_preview=json_module.dumps(products[:2], ensure_ascii=False, indent=2) if products else "[]"
|
||||
)
|
||||
break
|
||||
|
||||
# 如果有商品搜索结果,直接发送商品卡片
|
||||
if has_product_search_result and products:
|
||||
# 如果有商品结果,直接发送商品卡片(product_list 格式)
|
||||
if has_product_result and products:
|
||||
try:
|
||||
from integrations.chatwoot import ChatwootClient
|
||||
from core.language_detector import detect_language
|
||||
@@ -327,7 +425,7 @@ async def _generate_product_response(state: AgentState) -> AgentState:
|
||||
# 检测语言
|
||||
detected_language = state.get("detected_language", "en")
|
||||
|
||||
# 发送商品卡片
|
||||
# 发送商品列表
|
||||
chatwoot = ChatwootClient(account_id=int(state.get("account_id", 1)))
|
||||
conversation_id = state.get("conversation_id")
|
||||
|
||||
@@ -339,10 +437,11 @@ async def _generate_product_response(state: AgentState) -> AgentState:
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Product cards sent successfully",
|
||||
f"Product {result_source} cards sent successfully",
|
||||
conversation_id=conversation_id,
|
||||
products_count=len(products),
|
||||
language=detected_language
|
||||
language=detected_language,
|
||||
result_source=result_source
|
||||
)
|
||||
|
||||
# 清空响应,避免重复发送
|
||||
@@ -352,17 +451,78 @@ async def _generate_product_response(state: AgentState) -> AgentState:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to send product cards, falling back to text response",
|
||||
f"Failed to send product {result_source} cards, falling back to text response",
|
||||
error=str(e),
|
||||
products_count=len(products)
|
||||
products_count=len(products),
|
||||
result_source=result_source
|
||||
)
|
||||
|
||||
# 常规处理:生成文本响应
|
||||
tool_context = []
|
||||
for result in state["tool_results"]:
|
||||
if result["success"]:
|
||||
data = result["data"]
|
||||
tool_context.append(f"工具 {result['tool_name']} 返回:\n{json.dumps(data, ensure_ascii=False, indent=2)}")
|
||||
tool_name = result['tool_name']
|
||||
data = result['data']
|
||||
|
||||
# Extract only essential information based on tool type
|
||||
if tool_name == "search_products" or tool_name == "recommend_products":
|
||||
products = data.get("products", []) if isinstance(data, dict) else []
|
||||
if products:
|
||||
# Keep top 5 products with more details
|
||||
product_items = []
|
||||
for p in products[:5]: # Increased from 3 to 5
|
||||
name = p.get('product_name', 'N/A')
|
||||
price = p.get('price', 'N/A')
|
||||
special_price = p.get('special_price')
|
||||
spu_id = p.get('spu_id', '')
|
||||
|
||||
# Show special price if available
|
||||
if special_price and float(special_price) > 0:
|
||||
price_str = f"原价: {price}, 特价: {special_price}"
|
||||
else:
|
||||
price_str = str(price)
|
||||
|
||||
# Format: [ID] Name - Price
|
||||
if spu_id:
|
||||
product_items.append(f"- [{spu_id}] {name} - {price_str}")
|
||||
else:
|
||||
product_items.append(f"- {name} - {price_str}")
|
||||
|
||||
summary = f"Found {len(products)} products:\n" + "\n".join(product_items)
|
||||
|
||||
# Add note if there are more products
|
||||
if len(products) > 5:
|
||||
summary += f"\n(and {len(products) - 5} more products, visit website for full selection)"
|
||||
|
||||
tool_context.append(summary)
|
||||
else:
|
||||
tool_context.append("No products found")
|
||||
|
||||
elif tool_name == "get_product_detail":
|
||||
product = data.get("product", {}) if isinstance(data, dict) else {}
|
||||
name = product.get("product_name", product.get("name", "N/A"))
|
||||
price = product.get("price", "N/A")
|
||||
stock = product.get("stock", product.get("stock_status", "N/A"))
|
||||
summary = f"Product: {name} | Price: {price} | Stock: {stock}"
|
||||
tool_context.append(summary)
|
||||
|
||||
elif tool_name == "check_inventory":
|
||||
inventory = data.get("inventory", []) if isinstance(data, dict) else []
|
||||
inv_summaries = [f"{inv.get('product_id', 'N/A')}: {inv.get('quantity', 'N/A')} available" for inv in inventory[:3]]
|
||||
summary = "Inventory status:\n" + "\n".join(inv_summaries)
|
||||
tool_context.append(summary)
|
||||
|
||||
elif tool_name == "get_pricing":
|
||||
product_id = data.get("product_id", "N/A")
|
||||
unit_price = data.get("unit_price", "N/A")
|
||||
total_price = data.get("total_price", "N/A")
|
||||
summary = f"Quote for {product_id}: Unit: {unit_price} | Total: {total_price}"
|
||||
tool_context.append(summary)
|
||||
|
||||
else:
|
||||
# For other tools, include concise summary (limit to 200 chars)
|
||||
data_str = json.dumps(data, ensure_ascii=False)[:200]
|
||||
tool_context.append(f"工具 {tool_name} 返回: {data_str}...")
|
||||
|
||||
# Extract product context
|
||||
if isinstance(data, dict):
|
||||
@@ -398,7 +558,8 @@ async def _generate_product_response(state: AgentState) -> AgentState:
|
||||
|
||||
try:
|
||||
llm = get_llm_client()
|
||||
response = await llm.chat(messages, temperature=0.7)
|
||||
# Lower temperature for faster response
|
||||
response = await llm.chat(messages, temperature=0.3)
|
||||
state = set_response(state, response.content)
|
||||
return state
|
||||
|
||||
|
||||
@@ -10,8 +10,17 @@ class Settings(BaseSettings):
|
||||
"""Application settings loaded from environment variables"""
|
||||
|
||||
# ============ AI Model ============
|
||||
zhipu_api_key: str = Field(..., description="ZhipuAI API Key")
|
||||
zhipu_model: str = Field(default="glm-4", description="ZhipuAI Model name")
|
||||
llm_provider: str = Field(default="zhipu", description="LLM provider: 'zhipu' or 'qwen'")
|
||||
|
||||
# ZhipuAI (智谱 AI)
|
||||
zhipu_api_key: str = Field(default="", description="ZhipuAI API Key")
|
||||
zhipu_model: str = Field(default="glm-4-flash", description="ZhipuAI Model name")
|
||||
|
||||
# Qwen (通义千问)
|
||||
qwen_api_key: str = Field(default="", description="Qwen/DashScope API Key")
|
||||
qwen_model: str = Field(default="qwen-omni-turbo", description="Qwen Model name")
|
||||
|
||||
# 通用配置
|
||||
enable_reasoning_mode: bool = Field(default=False, description="Enable AI reasoning/thinking mode (slower but more thoughtful)")
|
||||
reasoning_mode_for_complex: bool = Field(default=True, description="Enable reasoning mode only for complex queries")
|
||||
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
"""
|
||||
ZhipuAI LLM Client for B2B Shopping AI Assistant
|
||||
LLM Client for B2B Shopping AI Assistant
|
||||
Supports both ZhipuAI and Qwen (DashScope)
|
||||
"""
|
||||
import concurrent.futures
|
||||
from typing import Any, Optional
|
||||
from typing import Any, Optional, Union
|
||||
from dataclasses import dataclass
|
||||
|
||||
from zhipuai import ZhipuAI
|
||||
@@ -276,12 +277,168 @@ class ZhipuLLMClient:
|
||||
raise
|
||||
|
||||
|
||||
llm_client: Optional[ZhipuLLMClient] = None
|
||||
llm_client: Optional[Union[ZhipuLLMClient, "QwenLLMClient"]] = None
|
||||
|
||||
|
||||
def get_llm_client() -> ZhipuLLMClient:
|
||||
"""Get or create global LLM client instance"""
|
||||
def get_llm_client() -> Union[ZhipuLLMClient, "QwenLLMClient"]:
|
||||
"""Get or create global LLM client instance based on provider setting"""
|
||||
global llm_client
|
||||
if llm_client is None:
|
||||
llm_client = ZhipuLLMClient()
|
||||
provider = settings.llm_provider.lower()
|
||||
if provider == "qwen":
|
||||
llm_client = QwenLLMClient()
|
||||
else:
|
||||
llm_client = ZhipuLLMClient()
|
||||
return llm_client
|
||||
|
||||
|
||||
# ============ Qwen (DashScope) LLM Client ============
|
||||
|
||||
try:
|
||||
from dashscope import Generation
|
||||
DASHSCOPE_AVAILABLE = True
|
||||
except ImportError:
|
||||
DASHSCOPE_AVAILABLE = False
|
||||
logger.warning("DashScope SDK not installed. Qwen models will not be available.")
|
||||
|
||||
|
||||
class QwenLLMClient:
|
||||
"""Qwen (DashScope) LLM Client wrapper"""
|
||||
|
||||
DEFAULT_TIMEOUT = 60 # seconds
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
api_key: Optional[str] = None,
|
||||
model: Optional[str] = None,
|
||||
timeout: Optional[int] = None
|
||||
):
|
||||
if not DASHSCOPE_AVAILABLE:
|
||||
raise ImportError("DashScope SDK is not installed. Install it with: pip install dashscope")
|
||||
|
||||
self.api_key = api_key or settings.qwen_api_key
|
||||
self.model = model or settings.qwen_model
|
||||
self.timeout = timeout or self.DEFAULT_TIMEOUT
|
||||
|
||||
# 设置 API key 到 DashScope SDK
|
||||
# 必须直接设置 dashscope.api_key,环境变量可能不够
|
||||
import dashscope
|
||||
dashscope.api_key = self.api_key
|
||||
|
||||
logger.info(
|
||||
"Qwen client initialized",
|
||||
model=self.model,
|
||||
timeout=self.timeout,
|
||||
api_key_prefix=self.api_key[:10] + "..." if len(self.api_key) > 10 else self.api_key
|
||||
)
|
||||
|
||||
async def chat(
|
||||
self,
|
||||
messages: list[Message],
|
||||
temperature: float = 0.7,
|
||||
max_tokens: int = 2048,
|
||||
top_p: float = 0.9,
|
||||
use_cache: bool = True,
|
||||
**kwargs: Any
|
||||
) -> LLMResponse:
|
||||
"""Send chat completion request with caching support"""
|
||||
formatted_messages = [
|
||||
{"role": msg.role, "content": msg.content}
|
||||
for msg in messages
|
||||
]
|
||||
|
||||
# Try cache first
|
||||
if use_cache:
|
||||
try:
|
||||
cache = get_response_cache()
|
||||
cached_response = await cache.get(
|
||||
model=self.model,
|
||||
messages=formatted_messages,
|
||||
temperature=temperature
|
||||
)
|
||||
if cached_response is not None:
|
||||
logger.info(
|
||||
"Returning cached response",
|
||||
model=self.model,
|
||||
response_length=len(cached_response)
|
||||
)
|
||||
return LLMResponse(
|
||||
content=cached_response,
|
||||
finish_reason="cache_hit",
|
||||
usage={"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Cache check failed", error=str(e))
|
||||
|
||||
logger.info(
|
||||
"Sending chat request",
|
||||
model=self.model,
|
||||
message_count=len(messages),
|
||||
temperature=temperature
|
||||
)
|
||||
|
||||
def _make_request():
|
||||
response = Generation.call(
|
||||
model=self.model,
|
||||
messages=formatted_messages,
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
top_p=top_p,
|
||||
result_format="message",
|
||||
**kwargs
|
||||
)
|
||||
return response
|
||||
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(_make_request)
|
||||
response = future.result(timeout=self.timeout)
|
||||
|
||||
# Qwen API 响应格式
|
||||
if response.status_code != 200:
|
||||
raise Exception(f"Qwen API error: {response.message}")
|
||||
|
||||
content = response.output.choices[0].message.content
|
||||
finish_reason = response.output.choices[0].finish_reason
|
||||
usage = response.usage
|
||||
|
||||
logger.info(
|
||||
"Chat response received",
|
||||
finish_reason=finish_reason,
|
||||
content_length=len(content) if content else 0,
|
||||
usage=usage
|
||||
)
|
||||
|
||||
if not content:
|
||||
logger.warning("LLM returned empty content")
|
||||
|
||||
# Cache the response
|
||||
if use_cache and content:
|
||||
try:
|
||||
cache = get_response_cache()
|
||||
await cache.set(
|
||||
model=self.model,
|
||||
messages=formatted_messages,
|
||||
response=content,
|
||||
temperature=temperature
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to cache response", error=str(e))
|
||||
|
||||
return LLMResponse(
|
||||
content=content or "",
|
||||
finish_reason=finish_reason,
|
||||
usage={
|
||||
"prompt_tokens": usage.input_tokens,
|
||||
"completion_tokens": usage.output_tokens,
|
||||
"total_tokens": usage.total_tokens
|
||||
}
|
||||
)
|
||||
|
||||
except concurrent.futures.TimeoutError:
|
||||
logger.error("Chat request timed out", timeout=self.timeout)
|
||||
raise TimeoutError(f"Request timed out after {self.timeout} seconds")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Chat request failed", error=str(e))
|
||||
raise
|
||||
|
||||
@@ -1089,7 +1089,7 @@ class ChatwootClient:
|
||||
products: list[dict[str, Any]],
|
||||
language: str = "en"
|
||||
) -> dict[str, Any]:
|
||||
"""发送商品搜索结果(使用 cards 格式)
|
||||
"""发送商品搜索结果(使用 product_list 格式)
|
||||
|
||||
Args:
|
||||
conversation_id: 会话 ID
|
||||
@@ -1100,8 +1100,7 @@ class ChatwootClient:
|
||||
- product_image: 商品图片 URL
|
||||
- price: 价格
|
||||
- special_price: 特价(可选)
|
||||
- stock: 库存
|
||||
- sales_count: 销量
|
||||
- href: 商品链接路径(可选)
|
||||
language: 语言代码(en, nl, de, es, fr, it, tr, zh),默认 en
|
||||
|
||||
Returns:
|
||||
@@ -1114,124 +1113,89 @@ class ChatwootClient:
|
||||
... "product_name": "Product A",
|
||||
... "product_image": "https://...",
|
||||
... "price": "99.99",
|
||||
... "stock": 100
|
||||
... "href": "/product/detail/12345"
|
||||
... }
|
||||
... ]
|
||||
>>> await chatwoot.send_product_cards(123, products, language="zh")
|
||||
"""
|
||||
# 获取前端 URL
|
||||
client = await self._get_client()
|
||||
|
||||
# 获取前端域名
|
||||
frontend_url = settings.frontend_url.rstrip('/')
|
||||
|
||||
# 构建商品卡片
|
||||
cards = []
|
||||
# 构建商品列表
|
||||
product_list = []
|
||||
|
||||
for product in products:
|
||||
spu_id = product.get("spu_id", "")
|
||||
spu_sn = product.get("spu_sn", "")
|
||||
product_name = product.get("product_name", "Unknown Product")
|
||||
product_image = product.get("product_image", "")
|
||||
price = product.get("price", "0")
|
||||
special_price = product.get("special_price")
|
||||
stock = product.get("stock", 0)
|
||||
sales_count = product.get("sales_count", 0)
|
||||
href = product.get("href", "")
|
||||
|
||||
# 价格显示(如果有特价则显示特价)
|
||||
# 价格显示(如果有特价则显示特价,否则显示原价)
|
||||
try:
|
||||
price_num = float(price) if price else 0
|
||||
price_text = f"€{price_num:.2f}"
|
||||
if special_price and float(special_price) > 0:
|
||||
price_num = float(special_price)
|
||||
else:
|
||||
price_num = float(price) if price else 0
|
||||
|
||||
# 根据语言选择货币符号
|
||||
if language == "zh":
|
||||
price_text = f"¥{price_num:.2f}"
|
||||
else:
|
||||
price_text = f"€{price_num:.2f}"
|
||||
except (ValueError, TypeError):
|
||||
price_text = str(price) if price else "€0.00"
|
||||
price_text = str(price) if price else ("¥0.00" if language == "zh" else "€0.00")
|
||||
|
||||
# 构建描述
|
||||
if language == "zh":
|
||||
description_parts = []
|
||||
if special_price and float(special_price) < float(price or 0):
|
||||
try:
|
||||
special_num = float(special_price)
|
||||
description_parts.append(f"特价: €{special_num:.2f}")
|
||||
except:
|
||||
pass
|
||||
if stock is not None:
|
||||
description_parts.append(f"库存: {stock}")
|
||||
if sales_count:
|
||||
description_parts.append(f"已售: {sales_count}")
|
||||
description = " | ".join(description_parts) if description_parts else "暂无详细信息"
|
||||
else:
|
||||
description_parts = []
|
||||
if special_price and float(special_price) < float(price or 0):
|
||||
try:
|
||||
special_num = float(special_price)
|
||||
description_parts.append(f"Special: €{special_num:.2f}")
|
||||
except:
|
||||
pass
|
||||
if stock is not None:
|
||||
description_parts.append(f"Stock: {stock}")
|
||||
if sales_count:
|
||||
description_parts.append(f"Sold: {sales_count}")
|
||||
description = " | ".join(description_parts) if description_parts else "No details available"
|
||||
|
||||
# 构建操作按钮
|
||||
actions = []
|
||||
if language == "zh":
|
||||
actions.append({
|
||||
"type": "link",
|
||||
"text": "查看详情",
|
||||
"uri": f"{frontend_url}/product/detail?spuId={spu_id}"
|
||||
})
|
||||
if stock and stock > 0:
|
||||
actions.append({
|
||||
"type": "link",
|
||||
"text": "立即购买",
|
||||
"uri": f"{frontend_url}/product/detail?spuId={spu_id}"
|
||||
})
|
||||
else:
|
||||
actions.append({
|
||||
"type": "link",
|
||||
"text": "View Details",
|
||||
"uri": f"{frontend_url}/product/detail?spuId={spu_id}"
|
||||
})
|
||||
if stock and stock > 0:
|
||||
actions.append({
|
||||
"type": "link",
|
||||
"text": "Buy Now",
|
||||
"uri": f"{frontend_url}/product/detail?spuId={spu_id}"
|
||||
})
|
||||
|
||||
# 构建卡片
|
||||
card = {
|
||||
"title": product_name,
|
||||
"description": description,
|
||||
"media_url": product_image,
|
||||
"actions": actions
|
||||
# 构建商品对象
|
||||
product_obj = {
|
||||
"image": product_image,
|
||||
"name": product_name,
|
||||
"price": price_text,
|
||||
"target": "_blank"
|
||||
}
|
||||
|
||||
cards.append(card)
|
||||
# 如果有 href,添加完整 URL
|
||||
if href:
|
||||
product_obj["url"] = f"{frontend_url}{href}"
|
||||
else:
|
||||
# 如果没有 href,使用 spu_id 构建默认链接
|
||||
spu_id = product.get("spu_id", "")
|
||||
if spu_id:
|
||||
product_obj["url"] = f"{frontend_url}/product/detail?spuId={spu_id}"
|
||||
|
||||
# 发送 cards 类型消息
|
||||
client = await self._get_client()
|
||||
product_list.append(product_obj)
|
||||
|
||||
# 构建标题
|
||||
if language == "zh":
|
||||
title = "找到以下商品"
|
||||
else:
|
||||
title = "Found following products"
|
||||
|
||||
# 构建 content_attributes
|
||||
content_attributes = {
|
||||
"items": cards
|
||||
"title": title,
|
||||
"products": product_list,
|
||||
"actions": []
|
||||
}
|
||||
|
||||
# 添加标题
|
||||
if language == "zh":
|
||||
content = f"找到 {len(products)} 个商品"
|
||||
else:
|
||||
content = f"Found {len(products)} products"
|
||||
|
||||
# 发送 product_list 类型消息
|
||||
payload = {
|
||||
"content": content,
|
||||
"content_type": "cards",
|
||||
"content": "",
|
||||
"content_type": "product_list",
|
||||
"message_type": 1,
|
||||
"content_attributes": content_attributes
|
||||
}
|
||||
|
||||
# 输出完整的 payload 用于调试
|
||||
import json as json_module
|
||||
logger.info(
|
||||
"Sending product cards",
|
||||
"Sending product list to Chatwoot",
|
||||
conversation_id=conversation_id,
|
||||
products_count=len(products),
|
||||
products_count=len(product_list),
|
||||
language=language,
|
||||
payload_preview=str(payload)[:1000]
|
||||
payload=json_module.dumps(payload, ensure_ascii=False, indent=2)
|
||||
)
|
||||
|
||||
response = await client.post(
|
||||
|
||||
@@ -12,10 +12,16 @@ system_prompt: |
|
||||
|
||||
## Available Tools
|
||||
|
||||
**search_products** - Search for products
|
||||
- query: Search keywords
|
||||
- category: Product category (optional)
|
||||
- filters: {attribute: value} (optional)
|
||||
**search_products** - Search for products by keyword
|
||||
- keyword: Search keywords (required)
|
||||
- page_size: Number of results (optional, default: 6)
|
||||
- page: Page number (optional, default: 1)
|
||||
|
||||
**recommend_products** - Get personalized product recommendations (Love List/心动清单)
|
||||
- page_size: Number of recommendations (optional, default: 6)
|
||||
- page: Page number (optional, default: 1)
|
||||
- warehouse_id: Warehouse ID (optional, default: 2)
|
||||
- Note: Requires user authentication token, uses Mall API /mall/api/loveList
|
||||
|
||||
**get_product_details** - Get detailed product information
|
||||
- product_id: Product ID or SKU
|
||||
@@ -28,18 +34,22 @@ system_prompt: |
|
||||
- product_id: Product ID
|
||||
- quantity: Quantity for pricing (optional, for tiered pricing)
|
||||
|
||||
**recommend_products** - Get product recommendations
|
||||
- category: Product category
|
||||
- limit: Number of recommendations
|
||||
|
||||
## Important Rules
|
||||
|
||||
1. **Product Recognition**:
|
||||
- Product search/产品搜索/找产品/商品 → Use search_products
|
||||
- 泛泛推荐(推荐/推荐商品/猜你喜欢/心动清单,无具体关键词) → Use recommend_products
|
||||
- 具体商品推荐(推荐ring/推荐手机,有具体关键词) → Use search_products
|
||||
- Product search/产品搜索/找商品/搜索商品 + 具体关键词 → Use search_products
|
||||
- Price/价格/报价/多少钱 → Use get_pricing
|
||||
- Stock/库存/有没有货/现货 → Use check_stock
|
||||
- Product details/产品详情/产品信息/产品规格 → Use get_product_details
|
||||
- Recommendation/推荐/推荐产品 → Use recommend_products
|
||||
|
||||
2. **Recommendation vs Search**:
|
||||
- "推荐一些商品"、"推荐一下"、"有什么好推荐的"(无具体关键词) → Use recommend_products
|
||||
- "推荐ring相关的商品"、"推荐手机"、"推荐一些珠宝"(有具体关键词) → Use search_products (keyword: "ring"/"手机"/"珠宝")
|
||||
- "搜索ring"、"找ring商品" → Use search_products (keyword: "ring")
|
||||
|
||||
**规则**: 如果推荐请求中包含具体的商品关键词(如 ring、手机、珠宝等),使用 search_products 进行精准搜索。只有在泛泛请求推荐时才使用 recommend_products。
|
||||
|
||||
2. For B2B customers, prioritize wholesale/bulk pricing information
|
||||
3. Always check stock availability before suggesting purchases
|
||||
@@ -51,17 +61,24 @@ system_prompt: |
|
||||
- For Chinese inquiries, respond in Chinese
|
||||
- For English inquiries, respond in English
|
||||
|
||||
## ⚠️ CRITICAL: Response Format
|
||||
|
||||
**You MUST ALWAYS respond with valid JSON. Never respond with plain text.**
|
||||
|
||||
## Tool Call Format
|
||||
|
||||
When you need to use a tool, respond with EXACTLY this JSON format:
|
||||
```json
|
||||
{
|
||||
"action": "call_tool",
|
||||
"tool_name": "tool_name",
|
||||
"arguments": {"parameter": "value"}
|
||||
"tool_name": "search_products",
|
||||
"arguments": {
|
||||
"query": "product name"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Or to respond directly:
|
||||
When you can answer directly:
|
||||
```json
|
||||
{
|
||||
"action": "respond",
|
||||
@@ -69,6 +86,14 @@ system_prompt: |
|
||||
}
|
||||
```
|
||||
|
||||
**IMPORTANT:**
|
||||
- Your entire response must be a valid JSON object
|
||||
- Do NOT include any text outside the JSON
|
||||
- Do NOT use markdown code blocks
|
||||
- Always include "action" field
|
||||
- "action" must be either "call_tool" or "respond"
|
||||
- For "call_tool", you must include "tool_name" and "arguments"
|
||||
|
||||
tool_descriptions:
|
||||
search_products: "Search for products by keywords or category"
|
||||
get_product_details: "Get detailed product information"
|
||||
|
||||
@@ -9,6 +9,7 @@ langchain-core>=0.1.0
|
||||
|
||||
# AI Model SDK
|
||||
zhipuai>=2.0.0
|
||||
dashscope>=1.14.0
|
||||
# Async utilities
|
||||
sniffio>=1.3.0
|
||||
anyio>=4.0.0
|
||||
|
||||
@@ -6,6 +6,7 @@ import hashlib
|
||||
from typing import Any, Optional
|
||||
|
||||
from fastapi import APIRouter, Request, HTTPException, BackgroundTasks
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from config import settings
|
||||
@@ -75,6 +76,7 @@ class ChatwootWebhookPayload(BaseModel):
|
||||
message_type: Optional[str] = None
|
||||
content_type: Optional[str] = None
|
||||
private: Optional[bool] = False
|
||||
content_attributes: Optional[dict] = None # 图片搜索 URL 等额外属性
|
||||
conversation: Optional[WebhookConversation] = None
|
||||
sender: Optional[WebhookSender] = None
|
||||
contact: Optional[WebhookContact] = None
|
||||
@@ -132,7 +134,15 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token:
|
||||
conversation_id = str(conversation.id)
|
||||
content = payload.content
|
||||
|
||||
if not content:
|
||||
# 检查是否是图片搜索消息(在过滤空 content 之前)
|
||||
# 图片搜索消息的 content 通常是空的,但需要处理
|
||||
is_image_search = (
|
||||
payload.content_type == "search_image" or
|
||||
payload.content_type == 17
|
||||
)
|
||||
|
||||
# 只有非图片搜索消息才检查 content 是否为空
|
||||
if not content and not is_image_search:
|
||||
logger.debug("Empty message content, skipping")
|
||||
return
|
||||
|
||||
@@ -282,19 +292,7 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token:
|
||||
channel=conversation.channel if conversation else None
|
||||
)
|
||||
|
||||
# 识别消息渠道(邮件、网站等)
|
||||
message_channel = conversation.channel if conversation else "Channel"
|
||||
is_email = message_channel == "Email"
|
||||
|
||||
# 邮件渠道特殊处理
|
||||
if is_email:
|
||||
logger.info(
|
||||
"Email channel detected",
|
||||
conversation_id=conversation_id,
|
||||
sender_email=contact.email if contact else None
|
||||
)
|
||||
|
||||
# Load conversation context from cache
|
||||
# Load conversation context from cache (需要在图片搜索检测之前)
|
||||
cache = get_cache_manager()
|
||||
await cache.connect()
|
||||
|
||||
@@ -307,10 +305,125 @@ async def handle_incoming_message(payload: ChatwootWebhookPayload, cookie_token:
|
||||
if mall_token:
|
||||
context["mall_token"] = mall_token
|
||||
|
||||
# 图片 URL 可能在两个位置:
|
||||
# 1. payload.content_attributes.url (顶层)
|
||||
# 2. payload.conversation.messages[0].content_attributes.url (嵌套)
|
||||
image_url = None
|
||||
|
||||
# 首先尝试从顶层获取
|
||||
if payload.content_attributes:
|
||||
image_url = payload.content_attributes.get("url")
|
||||
|
||||
# 如果顶层没有,尝试从 conversation.messages[0] 获取
|
||||
if not image_url and conversation and hasattr(conversation, 'model_dump'):
|
||||
conv_dict = conversation.model_dump()
|
||||
messages = conv_dict.get('messages', [])
|
||||
if messages and len(messages) > 0:
|
||||
first_message = messages[0]
|
||||
msg_content_attrs = first_message.get('content_attributes', {})
|
||||
image_url = msg_content_attrs.get('url')
|
||||
|
||||
if is_image_search and image_url:
|
||||
logger.info(
|
||||
"Image search detected",
|
||||
conversation_id=conversation_id,
|
||||
image_url=image_url[:100] + "..." if len(image_url) > 100 else image_url
|
||||
)
|
||||
|
||||
# 创建 Chatwoot client
|
||||
from integrations.chatwoot import ChatwootClient
|
||||
chatwoot = ChatwootClient(account_id=int(account_id))
|
||||
|
||||
# 开启 typing status
|
||||
try:
|
||||
await chatwoot.toggle_typing_status(
|
||||
conversation_id=conversation.id,
|
||||
typing_status="on"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to enable typing status for image search",
|
||||
conversation_id=conversation_id,
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
# 直接调用图片搜索工具
|
||||
# 修改 content 为图片搜索指令
|
||||
search_content = f"[图片搜索] 请根据这张图片搜索相似的商品"
|
||||
|
||||
# 使用特殊标记让 Product Agent 知道这是图片搜索
|
||||
context["image_search_url"] = image_url
|
||||
|
||||
final_state = await process_message(
|
||||
conversation_id=conversation_id,
|
||||
user_id=user_id,
|
||||
account_id=account_id,
|
||||
message=search_content,
|
||||
history=history,
|
||||
context=context,
|
||||
user_token=user_token,
|
||||
mall_token=mall_token
|
||||
)
|
||||
|
||||
# 获取响应并发送
|
||||
response = final_state.get("response", "")
|
||||
|
||||
if response:
|
||||
await chatwoot.send_message(
|
||||
conversation_id=conversation.id,
|
||||
content=response
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Image search response sent",
|
||||
conversation_id=conversation_id,
|
||||
response_length=len(response)
|
||||
)
|
||||
|
||||
# 关闭 typing status
|
||||
await chatwoot.toggle_typing_status(
|
||||
conversation_id=conversation.id,
|
||||
typing_status="off"
|
||||
)
|
||||
|
||||
await chatwoot.close()
|
||||
|
||||
# Update cache
|
||||
await cache.add_message(conversation_id, "user", search_content)
|
||||
await cache.add_message(conversation_id, "assistant", response)
|
||||
|
||||
# Save context
|
||||
new_context = final_state.get("context", {})
|
||||
new_context["last_intent"] = final_state.get("intent")
|
||||
await cache.set_context(conversation_id, new_context)
|
||||
|
||||
logger.info(
|
||||
"Image search message processed successfully",
|
||||
conversation_id=conversation_id,
|
||||
intent=final_state.get("intent")
|
||||
)
|
||||
|
||||
return JSONResponse(
|
||||
content={"status": "success"},
|
||||
status_code=200
|
||||
)
|
||||
|
||||
# 识别消息渠道(邮件、网站等)
|
||||
message_channel = conversation.channel if conversation else "Channel"
|
||||
is_email = message_channel == "Email"
|
||||
|
||||
# 添加渠道信息到 context(让 Agent 知道是邮件还是网站)
|
||||
context["channel"] = message_channel
|
||||
context["is_email"] = is_email
|
||||
|
||||
# 邮件渠道特殊处理
|
||||
if is_email:
|
||||
logger.info(
|
||||
"Email channel detected",
|
||||
conversation_id=conversation_id,
|
||||
sender_email=contact.email if contact else None
|
||||
)
|
||||
|
||||
# 创建 Chatwoot client(提前创建以便开启 typing status)
|
||||
from integrations.chatwoot import ChatwootClient
|
||||
chatwoot = ChatwootClient(account_id=int(account_id))
|
||||
|
||||
@@ -29,8 +29,11 @@ services:
|
||||
container_name: ai_agent
|
||||
environment:
|
||||
# AI Model
|
||||
LLM_PROVIDER: ${LLM_PROVIDER:-zhipu}
|
||||
ZHIPU_API_KEY: ${ZHIPU_API_KEY}
|
||||
ZHIPU_MODEL: ${ZHIPU_MODEL:-glm-4}
|
||||
ZHIPU_MODEL: ${ZHIPU_MODEL:-glm-4-flash}
|
||||
QWEN_API_KEY: ${QWEN_API_KEY}
|
||||
QWEN_MODEL: ${QWEN_MODEL:-qwen-omni-turbo}
|
||||
# Redis
|
||||
REDIS_HOST: redis
|
||||
REDIS_PORT: 6379
|
||||
|
||||
@@ -82,50 +82,118 @@ async def get_product_detail(
|
||||
@register_tool("recommend_products")
|
||||
@mcp.tool()
|
||||
async def recommend_products(
|
||||
user_id: str,
|
||||
account_id: str,
|
||||
context: Optional[dict] = None,
|
||||
strategy: str = "hybrid",
|
||||
limit: int = 10
|
||||
user_token: str,
|
||||
page: int = 1,
|
||||
page_size: int = 6,
|
||||
warehouse_id: int = 2
|
||||
) -> dict:
|
||||
"""Get personalized product recommendations
|
||||
"""Get recommended products from Mall API Love List
|
||||
|
||||
从 Mall API 获取推荐商品列表(心动清单/猜你喜欢)
|
||||
|
||||
Args:
|
||||
user_id: User identifier
|
||||
account_id: B2B account identifier
|
||||
context: Optional context for recommendations:
|
||||
- current_query: Current search query
|
||||
- recent_views: List of recently viewed product IDs
|
||||
- cart_items: Items in cart
|
||||
strategy: Recommendation strategy (collaborative, content_based, hybrid)
|
||||
limit: Maximum recommendations to return (default: 10)
|
||||
user_token: User JWT token for authentication
|
||||
page: Page number (default: 1)
|
||||
page_size: Number of products per page (default: 6, max 100)
|
||||
warehouse_id: Warehouse ID (default: 2)
|
||||
|
||||
Returns:
|
||||
List of recommended products with reasons
|
||||
List of recommended products with product details
|
||||
"""
|
||||
payload = {
|
||||
"user_id": user_id,
|
||||
"account_id": account_id,
|
||||
"strategy": strategy,
|
||||
"limit": limit
|
||||
}
|
||||
|
||||
if context:
|
||||
payload["context"] = context
|
||||
|
||||
try:
|
||||
result = await hyperf.post("/products/recommend", json=payload)
|
||||
from shared.mall_client import MallClient
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logger.info(f"recommend_products called: page={page}, page_size={page_size}, warehouse_id={warehouse_id}")
|
||||
|
||||
# 创建 Mall 客户端(使用 user_token)
|
||||
mall = MallClient(
|
||||
api_url=settings.mall_api_url,
|
||||
api_token=user_token, # 使用用户的 token
|
||||
tenant_id=settings.mall_tenant_id,
|
||||
currency_code=settings.mall_currency_code,
|
||||
language_id=settings.mall_language_id,
|
||||
source=settings.mall_source
|
||||
)
|
||||
|
||||
result = await mall.get_love_list(
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
warehouse_id=warehouse_id
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Mall API love list returned: result_type={type(result).__name__}, "
|
||||
f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, "
|
||||
f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}"
|
||||
)
|
||||
|
||||
# 解析返回结果
|
||||
# Mall API 实际返回结构: {"total": X, "data": {"data": [...]}}
|
||||
if isinstance(result, dict) and "data" in result:
|
||||
data = result.get("data", {})
|
||||
if isinstance(data, dict) and "data" in data:
|
||||
products_list = data.get("data", [])
|
||||
else:
|
||||
products_list = []
|
||||
total = result.get("total", 0)
|
||||
elif isinstance(result, dict) and "list" in result:
|
||||
# 兼容可能的 list 结构
|
||||
products_list = result.get("list", [])
|
||||
total = result.get("total", 0)
|
||||
else:
|
||||
products_list = []
|
||||
total = 0
|
||||
|
||||
logger.info(f"Extracted {len(products_list)} products from love list, total={total}")
|
||||
|
||||
# 格式化商品数据(与 search_products 格式一致)
|
||||
formatted_products = []
|
||||
for product in products_list:
|
||||
formatted_products.append({
|
||||
"spu_id": product.get("spuId"),
|
||||
"spu_sn": product.get("spuSn"),
|
||||
"product_name": product.get("spuName"),
|
||||
"product_image": product.get("masterImage"),
|
||||
"price": product.get("price"),
|
||||
"special_price": product.get("specialPrice"),
|
||||
"stock": product.get("stockDescribe"),
|
||||
"sales_count": product.get("salesCount", 0),
|
||||
# 额外有用字段
|
||||
"href": product.get("href"),
|
||||
"spu_type": product.get("spuType"),
|
||||
"spu_type_name": product.get("spuTypeName"),
|
||||
"min_price": product.get("minPrice"),
|
||||
"max_price": product.get("maxPrice"),
|
||||
"price_with_currency": product.get("priceWithCurrency"),
|
||||
"mark_price": product.get("markPrice"),
|
||||
"skus_count": len(product.get("skus", []))
|
||||
})
|
||||
|
||||
logger.info(f"Formatted {len(formatted_products)} products for recommendation")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"recommendations": result.get("recommendations", [])
|
||||
"products": formatted_products,
|
||||
"total": total,
|
||||
"keyword": "recommendation"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.error(f"recommend_products failed: {str(e)}", exc_info=True)
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"recommendations": []
|
||||
"products": [],
|
||||
"total": 0
|
||||
}
|
||||
finally:
|
||||
# 关闭客户端
|
||||
if 'mall' in dir():
|
||||
await mall.close()
|
||||
|
||||
|
||||
@register_tool("get_quote")
|
||||
@@ -249,7 +317,7 @@ async def get_categories() -> dict:
|
||||
@mcp.tool()
|
||||
async def search_products(
|
||||
keyword: str,
|
||||
page_size: int = 5,
|
||||
page_size: int = 6,
|
||||
page: int = 1
|
||||
) -> dict:
|
||||
"""Search products from Mall API
|
||||
@@ -258,7 +326,7 @@ async def search_products(
|
||||
|
||||
Args:
|
||||
keyword: 搜索关键词(商品名称、编号等)
|
||||
page_size: 每页数量 (default: 5, max 100)
|
||||
page_size: 每页数量 (default: 6, max 100)
|
||||
page: 页码 (default: 1)
|
||||
|
||||
Returns:
|
||||
@@ -299,6 +367,18 @@ async def search_products(
|
||||
)
|
||||
print(f"[DEBUG] Mall API returned: total={result.get('total', 'N/A')}, data_keys={list(result.get('data', {}).keys()) if isinstance(result.get('data'), dict) else 'N/A'}")
|
||||
|
||||
# 详细输出商品数据
|
||||
if "data" in result and isinstance(result["data"], dict):
|
||||
products = result["data"].get("data", [])
|
||||
print(f"[DEBUG] Found {len(products)} products in response")
|
||||
for i, p in enumerate(products[:3]): # 只打印前3个
|
||||
print(f"[DEBUG] Product {i+1}: spuId={p.get('spuId')}, spuName={p.get('spuName')}, price={p.get('price')}")
|
||||
else:
|
||||
products = result.get("list", [])
|
||||
print(f"[DEBUG] Found {len(products)} products in list")
|
||||
total = result.get("total", 0)
|
||||
print(f"[DEBUG] Total products: {total}")
|
||||
|
||||
# 解析返回结果
|
||||
# Mall API 返回结构: {"total": X, "data": {"data": [...], ...}}
|
||||
if "data" in result and isinstance(result["data"], dict):
|
||||
@@ -349,6 +429,116 @@ async def search_products(
|
||||
await mall.close()
|
||||
|
||||
|
||||
@register_tool("search_products_by_image")
|
||||
@mcp.tool()
|
||||
async def search_products_by_image(
|
||||
image_url: str,
|
||||
page_size: int = 6,
|
||||
page: int = 1
|
||||
) -> dict:
|
||||
"""Search products by image from Mall API
|
||||
|
||||
从 Mall API 根据图片搜索商品 SPU(以图搜图)
|
||||
|
||||
Args:
|
||||
image_url: 图片 URL(需要 URL 编码)
|
||||
page_size: 每页数量 (default: 6, max 100)
|
||||
page: 页码 (default: 1)
|
||||
|
||||
Returns:
|
||||
商品列表,包含 SPU 信息、商品图片、价格等
|
||||
Product list including SPU ID, name, image, price, etc.
|
||||
"""
|
||||
try:
|
||||
from shared.mall_client import MallClient
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logger.info(f"search_products_by_image called: image_url={image_url}")
|
||||
|
||||
# 创建 Mall 客户端(不需要 token)
|
||||
mall = MallClient(
|
||||
api_url=settings.mall_api_url,
|
||||
api_token=None, # 图片搜索不需要 token
|
||||
tenant_id=settings.mall_tenant_id,
|
||||
currency_code=settings.mall_currency_code,
|
||||
language_id=settings.mall_language_id,
|
||||
source=settings.mall_source
|
||||
)
|
||||
|
||||
# 调用 Mall API 图片搜索接口
|
||||
# 注意:httpx 会自动对 params 进行 URL 编码,不需要手动编码
|
||||
result = await mall.get(
|
||||
"/mall/api/spu",
|
||||
params={
|
||||
"pageSize": min(page_size, 100),
|
||||
"page": page,
|
||||
"searchImageUrl": image_url
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Mall API image search returned: result_type={type(result).__name__}, "
|
||||
f"result_keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, "
|
||||
f"total={result.get('total', 'N/A') if isinstance(result, dict) else 'N/A'}"
|
||||
)
|
||||
|
||||
# 解析返回结果
|
||||
# Mall API 返回结构: {"total": X, "data": {"data": [...]}}
|
||||
if "data" in result and isinstance(result["data"], dict):
|
||||
products = result["data"].get("data", [])
|
||||
else:
|
||||
products = result.get("list", [])
|
||||
total = result.get("total", 0)
|
||||
|
||||
# 格式化商品数据(与 search_products 格式一致)
|
||||
formatted_products = []
|
||||
for product in products:
|
||||
formatted_products.append({
|
||||
"spu_id": product.get("spuId"),
|
||||
"spu_sn": product.get("spuSn"),
|
||||
"product_name": product.get("spuName"),
|
||||
"product_image": product.get("masterImage"),
|
||||
"price": product.get("price"),
|
||||
"special_price": product.get("specialPrice"),
|
||||
"stock": product.get("stockDescribe"),
|
||||
"sales_count": product.get("salesCount", 0),
|
||||
# 额外有用字段
|
||||
"href": product.get("href"),
|
||||
"spu_type": product.get("spuType"),
|
||||
"spu_type_name": product.get("spuTypeName"),
|
||||
"min_price": product.get("minPrice"),
|
||||
"max_price": product.get("maxPrice"),
|
||||
"price_with_currency": product.get("priceWithCurrency"),
|
||||
"mark_price": product.get("markPrice"),
|
||||
"skus_count": len(product.get("skus", []))
|
||||
})
|
||||
|
||||
logger.info(f"Formatted {len(formatted_products)} products from image search")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"products": formatted_products,
|
||||
"total": total,
|
||||
"keyword": "image_search",
|
||||
"image_url": image_url
|
||||
}
|
||||
except Exception as e:
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.error(f"search_products_by_image failed: {str(e)}", exc_info=True)
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"products": [],
|
||||
"total": 0
|
||||
}
|
||||
finally:
|
||||
# 关闭客户端
|
||||
if 'mall' in dir():
|
||||
await mall.close()
|
||||
|
||||
|
||||
# Health check endpoint
|
||||
@register_tool("health_check")
|
||||
@mcp.tool()
|
||||
|
||||
@@ -316,6 +316,44 @@ class MallClient:
|
||||
except Exception as e:
|
||||
raise Exception(f"搜索商品失败 (Search SPU products failed): {str(e)}")
|
||||
|
||||
async def get_love_list(
|
||||
self,
|
||||
page: int = 1,
|
||||
page_size: int = 6,
|
||||
warehouse_id: int = 2
|
||||
) -> dict[str, Any]:
|
||||
"""Get recommended products (Love List) from Mall API
|
||||
|
||||
获取推荐商品列表
|
||||
|
||||
Args:
|
||||
page: Page number (default: 1)
|
||||
page_size: Number of products per page (default: 6)
|
||||
warehouse_id: Warehouse ID (default: 2)
|
||||
|
||||
Returns:
|
||||
Dictionary containing product list and metadata
|
||||
|
||||
Example:
|
||||
>>> client = MallClient()
|
||||
>>> result = await client.get_love_list(page=1, page_size=6, warehouse_id=2)
|
||||
>>> print(f"找到 {len(result.get('list', []))} 个推荐商品")
|
||||
"""
|
||||
try:
|
||||
params = {
|
||||
"page": page,
|
||||
"pageSize": page_size,
|
||||
"warehouseId": warehouse_id
|
||||
}
|
||||
|
||||
result = await self.get(
|
||||
"/mall/api/loveList",
|
||||
params=params
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
raise Exception(f"获取推荐商品失败 (Get love list failed): {str(e)}")
|
||||
|
||||
|
||||
# Global Mall client instance
|
||||
mall_client: Optional[MallClient] = None
|
||||
|
||||
Reference in New Issue
Block a user