图片搜索功能(以图搜图): - Chatwoot webhook 检测图片搜索消息 (content_type="search_image") - 从 content_attributes.url 提取图片 URL - 调用 Mall API 图片搜索接口 (/mall/api/spu?searchImageUrl=...) - 支持嵌套和顶层 URL 位置提取 - Product Agent 添加 fast path 直接调用图片搜索工具 - 防止无限循环(使用后清除 context.image_search_url) Qwen 模型支持: - 添加 LLM provider 选择(zhipu/qwen) - 实现 QwenLLMClient 类(基于 DashScope SDK) - 添加 dashscope>=1.14.0 依赖 - 修复 API key 设置(直接设置 dashscope.api_key) - 更新 .env.example 和 docker-compose.yml 配置 其他优化: - 重构 Chatwoot 集成代码(删除冗余) - 优化 Product Agent prompt - 增强 Customer Service Agent 多语言支持 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
445 lines
15 KiB
Python
445 lines
15 KiB
Python
"""
|
||
LLM Client for B2B Shopping AI Assistant
|
||
Supports both ZhipuAI and Qwen (DashScope)
|
||
"""
|
||
import concurrent.futures
|
||
from typing import Any, Optional, Union
|
||
from dataclasses import dataclass
|
||
|
||
from zhipuai import ZhipuAI
|
||
|
||
from config import settings
|
||
from utils.logger import get_logger
|
||
from utils.response_cache import get_response_cache
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class Message:
|
||
"""Chat message structure"""
|
||
role: str # "system", "user", "assistant"
|
||
content: str
|
||
|
||
|
||
@dataclass
|
||
class LLMResponse:
|
||
"""LLM response structure"""
|
||
content: str
|
||
finish_reason: str
|
||
usage: dict[str, int]
|
||
|
||
|
||
class ZhipuLLMClient:
|
||
"""ZhipuAI LLM Client wrapper"""
|
||
|
||
DEFAULT_TIMEOUT = 60 # seconds (increased from 30 for better reliability)
|
||
|
||
def __init__(
|
||
self,
|
||
api_key: Optional[str] = None,
|
||
model: Optional[str] = None,
|
||
timeout: Optional[int] = None,
|
||
enable_reasoning: Optional[bool] = None
|
||
):
|
||
self.api_key = api_key or settings.zhipu_api_key
|
||
self.model = model or settings.zhipu_model
|
||
self.timeout = timeout or self.DEFAULT_TIMEOUT
|
||
self.enable_reasoning = enable_reasoning if enable_reasoning is not None else settings.enable_reasoning_mode
|
||
self._client = ZhipuAI(api_key=self.api_key)
|
||
logger.info(
|
||
"ZhipuAI client initialized",
|
||
model=self.model,
|
||
timeout=self.timeout,
|
||
reasoning_mode=self.enable_reasoning
|
||
)
|
||
|
||
def _should_use_reasoning(self, messages: list[dict[str, str]]) -> bool:
|
||
"""Determine if reasoning mode should be used based on query complexity
|
||
|
||
Args:
|
||
messages: List of message dictionaries
|
||
|
||
Returns:
|
||
True if reasoning mode should be used
|
||
"""
|
||
if not self.enable_reasoning:
|
||
return False
|
||
|
||
if not settings.reasoning_mode_for_complex:
|
||
# If smart mode is disabled, use the global setting
|
||
return self.enable_reasoning
|
||
|
||
# Smart mode: analyze the last user message
|
||
last_message = ""
|
||
for msg in reversed(messages):
|
||
if msg.get("role") == "user":
|
||
last_message = msg.get("content", "")
|
||
break
|
||
|
||
# Simple queries that don't need reasoning
|
||
simple_patterns = [
|
||
"你好", "hi", "hello", "嗨",
|
||
"谢谢", "thank", "感谢",
|
||
"再见", "bye", "拜拜",
|
||
"退货政策", "营业时间", "联系方式",
|
||
"发货", "配送", "物流"
|
||
]
|
||
|
||
last_message_lower = last_message.lower()
|
||
for pattern in simple_patterns:
|
||
if pattern in last_message_lower:
|
||
logger.debug("Simple query detected, disabling reasoning", query=last_message[:50])
|
||
return False
|
||
|
||
# Complex queries that benefit from reasoning
|
||
complex_patterns = [
|
||
"为什么", "how", "why", "如何",
|
||
"推荐", "recommend", "建议",
|
||
"比较", "compare", "区别",
|
||
"怎么样", "如何选择"
|
||
]
|
||
|
||
for pattern in complex_patterns:
|
||
if pattern in last_message_lower:
|
||
logger.debug("Complex query detected, enabling reasoning", query=last_message[:50])
|
||
return True
|
||
|
||
# Default: disable reasoning for speed
|
||
return False
|
||
|
||
async def chat(
|
||
self,
|
||
messages: list[Message],
|
||
temperature: float = 0.7,
|
||
max_tokens: int = 2048,
|
||
top_p: float = 0.9,
|
||
use_cache: bool = True,
|
||
enable_reasoning: Optional[bool] = None,
|
||
**kwargs: Any
|
||
) -> LLMResponse:
|
||
"""Send chat completion request with caching support"""
|
||
formatted_messages = [
|
||
{"role": msg.role, "content": msg.content}
|
||
for msg in messages
|
||
]
|
||
|
||
# Try cache first
|
||
if use_cache:
|
||
try:
|
||
cache = get_response_cache()
|
||
cached_response = await cache.get(
|
||
model=self.model,
|
||
messages=formatted_messages,
|
||
temperature=temperature
|
||
)
|
||
if cached_response is not None:
|
||
logger.info(
|
||
"Returning cached response",
|
||
model=self.model,
|
||
response_length=len(cached_response)
|
||
)
|
||
return LLMResponse(
|
||
content=cached_response,
|
||
finish_reason="cache_hit",
|
||
usage={"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
|
||
)
|
||
except Exception as e:
|
||
logger.warning("Cache check failed", error=str(e))
|
||
|
||
logger.info(
|
||
"Sending chat request",
|
||
model=self.model,
|
||
message_count=len(messages),
|
||
temperature=temperature
|
||
)
|
||
|
||
# Determine if reasoning mode should be used
|
||
# 强制禁用深度思考模式以提升响应速度(2026-01-26)
|
||
use_reasoning = False # Override all settings to disable thinking mode
|
||
|
||
if use_reasoning:
|
||
logger.info("Reasoning mode enabled for this request")
|
||
|
||
def _make_request():
|
||
request_params = {
|
||
"model": self.model,
|
||
"messages": formatted_messages,
|
||
"temperature": temperature,
|
||
"max_tokens": max_tokens,
|
||
"top_p": top_p,
|
||
}
|
||
|
||
# Add thinking mode control
|
||
# Format: {"thinking": {"type": "disabled"}} or {"type": "enabled"}
|
||
if use_reasoning:
|
||
request_params["thinking"] = {"type": "enabled"}
|
||
logger.info("Thinking mode: enabled", request_params={"thinking": {"type": "enabled"}})
|
||
else:
|
||
request_params["thinking"] = {"type": "disabled"}
|
||
logger.info("Thinking mode: disabled", request_params={"thinking": {"type": "disabled"}})
|
||
|
||
request_params.update(kwargs)
|
||
return self._client.chat.completions.create(**request_params)
|
||
|
||
try:
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||
future = executor.submit(_make_request)
|
||
response = future.result(timeout=self.timeout)
|
||
|
||
choice = response.choices[0]
|
||
content = choice.message.content
|
||
|
||
logger.info(
|
||
"Chat response received",
|
||
finish_reason=choice.finish_reason,
|
||
content_length=len(content) if content else 0,
|
||
usage=response.usage.__dict__ if hasattr(response, 'usage') else {}
|
||
)
|
||
|
||
if not content:
|
||
logger.warning("LLM returned empty content")
|
||
|
||
# Cache the response
|
||
if use_cache and content:
|
||
try:
|
||
cache = get_response_cache()
|
||
await cache.set(
|
||
model=self.model,
|
||
messages=formatted_messages,
|
||
response=content,
|
||
temperature=temperature
|
||
)
|
||
except Exception as e:
|
||
logger.warning("Failed to cache response", error=str(e))
|
||
|
||
return LLMResponse(
|
||
content=content or "",
|
||
finish_reason=choice.finish_reason,
|
||
usage={
|
||
"prompt_tokens": response.usage.prompt_tokens,
|
||
"completion_tokens": response.usage.completion_tokens,
|
||
"total_tokens": response.usage.total_tokens
|
||
}
|
||
)
|
||
|
||
except concurrent.futures.TimeoutError:
|
||
logger.error("Chat request timed out", timeout=self.timeout)
|
||
raise TimeoutError(f"Request timed out after {self.timeout} seconds")
|
||
|
||
except Exception as e:
|
||
logger.error("Chat request failed", error=str(e))
|
||
raise
|
||
|
||
async def chat_with_tools(
|
||
self,
|
||
messages: list[Message],
|
||
tools: list[dict[str, Any]],
|
||
temperature: float = 0.7,
|
||
**kwargs: Any
|
||
) -> tuple[LLMResponse, None]:
|
||
"""Send chat completion request with tool calling"""
|
||
formatted_messages = [
|
||
{"role": msg.role, "content": msg.content}
|
||
for msg in messages
|
||
]
|
||
|
||
logger.info(
|
||
"Sending chat request with tools",
|
||
model=self.model,
|
||
tool_count=len(tools)
|
||
)
|
||
|
||
try:
|
||
response = self._client.chat.completions.create(
|
||
model=self.model,
|
||
messages=formatted_messages,
|
||
tools=tools,
|
||
temperature=temperature,
|
||
**kwargs
|
||
)
|
||
|
||
choice = response.choices[0]
|
||
content = choice.message.content or ""
|
||
|
||
return LLMResponse(
|
||
content=content,
|
||
finish_reason=choice.finish_reason,
|
||
usage={
|
||
"prompt_tokens": response.usage.prompt_tokens,
|
||
"completion_tokens": response.usage.completion_tokens,
|
||
"total_tokens": response.usage.total_tokens
|
||
}
|
||
), None
|
||
|
||
except Exception as e:
|
||
logger.error("Chat with tools request failed", error=str(e))
|
||
raise
|
||
|
||
|
||
llm_client: Optional[Union[ZhipuLLMClient, "QwenLLMClient"]] = None
|
||
|
||
|
||
def get_llm_client() -> Union[ZhipuLLMClient, "QwenLLMClient"]:
|
||
"""Get or create global LLM client instance based on provider setting"""
|
||
global llm_client
|
||
if llm_client is None:
|
||
provider = settings.llm_provider.lower()
|
||
if provider == "qwen":
|
||
llm_client = QwenLLMClient()
|
||
else:
|
||
llm_client = ZhipuLLMClient()
|
||
return llm_client
|
||
|
||
|
||
# ============ Qwen (DashScope) LLM Client ============
|
||
|
||
try:
|
||
from dashscope import Generation
|
||
DASHSCOPE_AVAILABLE = True
|
||
except ImportError:
|
||
DASHSCOPE_AVAILABLE = False
|
||
logger.warning("DashScope SDK not installed. Qwen models will not be available.")
|
||
|
||
|
||
class QwenLLMClient:
|
||
"""Qwen (DashScope) LLM Client wrapper"""
|
||
|
||
DEFAULT_TIMEOUT = 60 # seconds
|
||
|
||
def __init__(
|
||
self,
|
||
api_key: Optional[str] = None,
|
||
model: Optional[str] = None,
|
||
timeout: Optional[int] = None
|
||
):
|
||
if not DASHSCOPE_AVAILABLE:
|
||
raise ImportError("DashScope SDK is not installed. Install it with: pip install dashscope")
|
||
|
||
self.api_key = api_key or settings.qwen_api_key
|
||
self.model = model or settings.qwen_model
|
||
self.timeout = timeout or self.DEFAULT_TIMEOUT
|
||
|
||
# 设置 API key 到 DashScope SDK
|
||
# 必须直接设置 dashscope.api_key,环境变量可能不够
|
||
import dashscope
|
||
dashscope.api_key = self.api_key
|
||
|
||
logger.info(
|
||
"Qwen client initialized",
|
||
model=self.model,
|
||
timeout=self.timeout,
|
||
api_key_prefix=self.api_key[:10] + "..." if len(self.api_key) > 10 else self.api_key
|
||
)
|
||
|
||
async def chat(
|
||
self,
|
||
messages: list[Message],
|
||
temperature: float = 0.7,
|
||
max_tokens: int = 2048,
|
||
top_p: float = 0.9,
|
||
use_cache: bool = True,
|
||
**kwargs: Any
|
||
) -> LLMResponse:
|
||
"""Send chat completion request with caching support"""
|
||
formatted_messages = [
|
||
{"role": msg.role, "content": msg.content}
|
||
for msg in messages
|
||
]
|
||
|
||
# Try cache first
|
||
if use_cache:
|
||
try:
|
||
cache = get_response_cache()
|
||
cached_response = await cache.get(
|
||
model=self.model,
|
||
messages=formatted_messages,
|
||
temperature=temperature
|
||
)
|
||
if cached_response is not None:
|
||
logger.info(
|
||
"Returning cached response",
|
||
model=self.model,
|
||
response_length=len(cached_response)
|
||
)
|
||
return LLMResponse(
|
||
content=cached_response,
|
||
finish_reason="cache_hit",
|
||
usage={"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
|
||
)
|
||
except Exception as e:
|
||
logger.warning("Cache check failed", error=str(e))
|
||
|
||
logger.info(
|
||
"Sending chat request",
|
||
model=self.model,
|
||
message_count=len(messages),
|
||
temperature=temperature
|
||
)
|
||
|
||
def _make_request():
|
||
response = Generation.call(
|
||
model=self.model,
|
||
messages=formatted_messages,
|
||
temperature=temperature,
|
||
max_tokens=max_tokens,
|
||
top_p=top_p,
|
||
result_format="message",
|
||
**kwargs
|
||
)
|
||
return response
|
||
|
||
try:
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||
future = executor.submit(_make_request)
|
||
response = future.result(timeout=self.timeout)
|
||
|
||
# Qwen API 响应格式
|
||
if response.status_code != 200:
|
||
raise Exception(f"Qwen API error: {response.message}")
|
||
|
||
content = response.output.choices[0].message.content
|
||
finish_reason = response.output.choices[0].finish_reason
|
||
usage = response.usage
|
||
|
||
logger.info(
|
||
"Chat response received",
|
||
finish_reason=finish_reason,
|
||
content_length=len(content) if content else 0,
|
||
usage=usage
|
||
)
|
||
|
||
if not content:
|
||
logger.warning("LLM returned empty content")
|
||
|
||
# Cache the response
|
||
if use_cache and content:
|
||
try:
|
||
cache = get_response_cache()
|
||
await cache.set(
|
||
model=self.model,
|
||
messages=formatted_messages,
|
||
response=content,
|
||
temperature=temperature
|
||
)
|
||
except Exception as e:
|
||
logger.warning("Failed to cache response", error=str(e))
|
||
|
||
return LLMResponse(
|
||
content=content or "",
|
||
finish_reason=finish_reason,
|
||
usage={
|
||
"prompt_tokens": usage.input_tokens,
|
||
"completion_tokens": usage.output_tokens,
|
||
"total_tokens": usage.total_tokens
|
||
}
|
||
)
|
||
|
||
except concurrent.futures.TimeoutError:
|
||
logger.error("Chat request timed out", timeout=self.timeout)
|
||
raise TimeoutError(f"Request timed out after {self.timeout} seconds")
|
||
|
||
except Exception as e:
|
||
logger.error("Chat request failed", error=str(e))
|
||
raise
|