""" LLM Client for B2B Shopping AI Assistant Supports both ZhipuAI and Qwen (DashScope) """ import concurrent.futures from typing import Any, Optional, Union from dataclasses import dataclass from zhipuai import ZhipuAI from config import settings from utils.logger import get_logger from utils.response_cache import get_response_cache logger = get_logger(__name__) @dataclass class Message: """Chat message structure""" role: str # "system", "user", "assistant" content: str @dataclass class LLMResponse: """LLM response structure""" content: str finish_reason: str usage: dict[str, int] class ZhipuLLMClient: """ZhipuAI LLM Client wrapper""" DEFAULT_TIMEOUT = 60 # seconds (increased from 30 for better reliability) def __init__( self, api_key: Optional[str] = None, model: Optional[str] = None, timeout: Optional[int] = None, enable_reasoning: Optional[bool] = None ): self.api_key = api_key or settings.zhipu_api_key self.model = model or settings.zhipu_model self.timeout = timeout or self.DEFAULT_TIMEOUT self.enable_reasoning = enable_reasoning if enable_reasoning is not None else settings.enable_reasoning_mode self._client = ZhipuAI(api_key=self.api_key) logger.info( "ZhipuAI client initialized", model=self.model, timeout=self.timeout, reasoning_mode=self.enable_reasoning ) def _should_use_reasoning(self, messages: list[dict[str, str]]) -> bool: """Determine if reasoning mode should be used based on query complexity Args: messages: List of message dictionaries Returns: True if reasoning mode should be used """ if not self.enable_reasoning: return False if not settings.reasoning_mode_for_complex: # If smart mode is disabled, use the global setting return self.enable_reasoning # Smart mode: analyze the last user message last_message = "" for msg in reversed(messages): if msg.get("role") == "user": last_message = msg.get("content", "") break # Simple queries that don't need reasoning simple_patterns = [ "你好", "hi", "hello", "嗨", "谢谢", "thank", "感谢", "再见", "bye", "拜拜", "退货政策", "营业时间", "联系方式", "发货", "配送", "物流" ] last_message_lower = last_message.lower() for pattern in simple_patterns: if pattern in last_message_lower: logger.debug("Simple query detected, disabling reasoning", query=last_message[:50]) return False # Complex queries that benefit from reasoning complex_patterns = [ "为什么", "how", "why", "如何", "推荐", "recommend", "建议", "比较", "compare", "区别", "怎么样", "如何选择" ] for pattern in complex_patterns: if pattern in last_message_lower: logger.debug("Complex query detected, enabling reasoning", query=last_message[:50]) return True # Default: disable reasoning for speed return False async def chat( self, messages: list[Message], temperature: float = 0.7, max_tokens: int = 2048, top_p: float = 0.9, use_cache: bool = True, enable_reasoning: Optional[bool] = None, **kwargs: Any ) -> LLMResponse: """Send chat completion request with caching support""" formatted_messages = [ {"role": msg.role, "content": msg.content} for msg in messages ] # Try cache first if use_cache: try: cache = get_response_cache() cached_response = await cache.get( model=self.model, messages=formatted_messages, temperature=temperature ) if cached_response is not None: logger.info( "Returning cached response", model=self.model, response_length=len(cached_response) ) return LLMResponse( content=cached_response, finish_reason="cache_hit", usage={"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} ) except Exception as e: logger.warning("Cache check failed", error=str(e)) logger.info( "Sending chat request", model=self.model, message_count=len(messages), temperature=temperature ) # Determine if reasoning mode should be used # 强制禁用深度思考模式以提升响应速度(2026-01-26) use_reasoning = False # Override all settings to disable thinking mode if use_reasoning: logger.info("Reasoning mode enabled for this request") def _make_request(): request_params = { "model": self.model, "messages": formatted_messages, "temperature": temperature, "max_tokens": max_tokens, "top_p": top_p, } # Add thinking mode control # Format: {"thinking": {"type": "disabled"}} or {"type": "enabled"} if use_reasoning: request_params["thinking"] = {"type": "enabled"} logger.info("Thinking mode: enabled", request_params={"thinking": {"type": "enabled"}}) else: request_params["thinking"] = {"type": "disabled"} logger.info("Thinking mode: disabled", request_params={"thinking": {"type": "disabled"}}) request_params.update(kwargs) return self._client.chat.completions.create(**request_params) try: with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_make_request) response = future.result(timeout=self.timeout) choice = response.choices[0] content = choice.message.content logger.info( "Chat response received", finish_reason=choice.finish_reason, content_length=len(content) if content else 0, usage=response.usage.__dict__ if hasattr(response, 'usage') else {} ) if not content: logger.warning("LLM returned empty content") # Cache the response if use_cache and content: try: cache = get_response_cache() await cache.set( model=self.model, messages=formatted_messages, response=content, temperature=temperature ) except Exception as e: logger.warning("Failed to cache response", error=str(e)) return LLMResponse( content=content or "", finish_reason=choice.finish_reason, usage={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens } ) except concurrent.futures.TimeoutError: logger.error("Chat request timed out", timeout=self.timeout) raise TimeoutError(f"Request timed out after {self.timeout} seconds") except Exception as e: logger.error("Chat request failed", error=str(e)) raise async def chat_with_tools( self, messages: list[Message], tools: list[dict[str, Any]], temperature: float = 0.7, **kwargs: Any ) -> tuple[LLMResponse, None]: """Send chat completion request with tool calling""" formatted_messages = [ {"role": msg.role, "content": msg.content} for msg in messages ] logger.info( "Sending chat request with tools", model=self.model, tool_count=len(tools) ) try: response = self._client.chat.completions.create( model=self.model, messages=formatted_messages, tools=tools, temperature=temperature, **kwargs ) choice = response.choices[0] content = choice.message.content or "" return LLMResponse( content=content, finish_reason=choice.finish_reason, usage={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens } ), None except Exception as e: logger.error("Chat with tools request failed", error=str(e)) raise llm_client: Optional[Union[ZhipuLLMClient, "QwenLLMClient"]] = None def get_llm_client() -> Union[ZhipuLLMClient, "QwenLLMClient"]: """Get or create global LLM client instance based on provider setting""" global llm_client if llm_client is None: provider = settings.llm_provider.lower() if provider == "qwen": llm_client = QwenLLMClient() else: llm_client = ZhipuLLMClient() return llm_client # ============ Qwen (DashScope) LLM Client ============ try: from dashscope import Generation DASHSCOPE_AVAILABLE = True except ImportError: DASHSCOPE_AVAILABLE = False logger.warning("DashScope SDK not installed. Qwen models will not be available.") class QwenLLMClient: """Qwen (DashScope) LLM Client wrapper""" DEFAULT_TIMEOUT = 60 # seconds def __init__( self, api_key: Optional[str] = None, model: Optional[str] = None, timeout: Optional[int] = None ): if not DASHSCOPE_AVAILABLE: raise ImportError("DashScope SDK is not installed. Install it with: pip install dashscope") self.api_key = api_key or settings.qwen_api_key self.model = model or settings.qwen_model self.timeout = timeout or self.DEFAULT_TIMEOUT # 设置 API key 到 DashScope SDK # 必须直接设置 dashscope.api_key,环境变量可能不够 import dashscope dashscope.api_key = self.api_key logger.info( "Qwen client initialized", model=self.model, timeout=self.timeout, api_key_prefix=self.api_key[:10] + "..." if len(self.api_key) > 10 else self.api_key ) async def chat( self, messages: list[Message], temperature: float = 0.7, max_tokens: int = 2048, top_p: float = 0.9, use_cache: bool = True, **kwargs: Any ) -> LLMResponse: """Send chat completion request with caching support""" formatted_messages = [ {"role": msg.role, "content": msg.content} for msg in messages ] # Try cache first if use_cache: try: cache = get_response_cache() cached_response = await cache.get( model=self.model, messages=formatted_messages, temperature=temperature ) if cached_response is not None: logger.info( "Returning cached response", model=self.model, response_length=len(cached_response) ) return LLMResponse( content=cached_response, finish_reason="cache_hit", usage={"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} ) except Exception as e: logger.warning("Cache check failed", error=str(e)) logger.info( "Sending chat request", model=self.model, message_count=len(messages), temperature=temperature ) def _make_request(): response = Generation.call( model=self.model, messages=formatted_messages, temperature=temperature, max_tokens=max_tokens, top_p=top_p, result_format="message", **kwargs ) return response try: with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_make_request) response = future.result(timeout=self.timeout) # Qwen API 响应格式 if response.status_code != 200: raise Exception(f"Qwen API error: {response.message}") content = response.output.choices[0].message.content finish_reason = response.output.choices[0].finish_reason usage = response.usage logger.info( "Chat response received", finish_reason=finish_reason, content_length=len(content) if content else 0, usage=usage ) if not content: logger.warning("LLM returned empty content") # Cache the response if use_cache and content: try: cache = get_response_cache() await cache.set( model=self.model, messages=formatted_messages, response=content, temperature=temperature ) except Exception as e: logger.warning("Failed to cache response", error=str(e)) return LLMResponse( content=content or "", finish_reason=finish_reason, usage={ "prompt_tokens": usage.input_tokens, "completion_tokens": usage.output_tokens, "total_tokens": usage.total_tokens } ) except concurrent.futures.TimeoutError: logger.error("Chat request timed out", timeout=self.timeout) raise TimeoutError(f"Request timed out after {self.timeout} seconds") except Exception as e: logger.error("Chat request failed", error=str(e)) raise