Files
assistant/mcp_servers/shared/mall_client.py
wangliang 965b11316e feat: 添加图片搜索功能和 Qwen 模型支持
图片搜索功能(以图搜图):
- Chatwoot webhook 检测图片搜索消息 (content_type="search_image")
- 从 content_attributes.url 提取图片 URL
- 调用 Mall API 图片搜索接口 (/mall/api/spu?searchImageUrl=...)
- 支持嵌套和顶层 URL 位置提取
- Product Agent 添加 fast path 直接调用图片搜索工具
- 防止无限循环(使用后清除 context.image_search_url)

Qwen 模型支持:
- 添加 LLM provider 选择(zhipu/qwen)
- 实现 QwenLLMClient 类(基于 DashScope SDK)
- 添加 dashscope>=1.14.0 依赖
- 修复 API key 设置(直接设置 dashscope.api_key)
- 更新 .env.example 和 docker-compose.yml 配置

其他优化:
- 重构 Chatwoot 集成代码(删除冗余)
- 优化 Product Agent prompt
- 增强 Customer Service Agent 多语言支持

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-27 19:10:06 +08:00

368 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Mall API Client for MCP Servers
用于调用商城 API包括订单查询等接口
"""
from typing import Any, Optional
import httpx
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class MallSettings(BaseSettings):
"""Mall API configuration"""
mall_api_url: Optional[str] = None
mall_api_token: Optional[str] = None
mall_tenant_id: str = "2"
mall_currency_code: str = "EUR"
mall_language_id: str = "1"
mall_source: str = "us.qa1.gaia888.com"
model_config = ConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore"
)
settings = MallSettings()
class MallClient:
"""Async client for Mall API"""
def __init__(
self,
api_url: Optional[str] = None,
api_token: Optional[str] = None,
tenant_id: Optional[str] = None,
currency_code: Optional[str] = None,
language_id: Optional[str] = None,
source: Optional[str] = None
):
self.api_url = (api_url or settings.mall_api_url or "").rstrip("/")
self.api_token = api_token or settings.mall_api_token or ""
self.tenant_id = tenant_id or settings.mall_tenant_id
self.currency_code = currency_code or settings.mall_currency_code
self.language_id = language_id or settings.mall_language_id
self.source = source or settings.mall_source
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self, extra_headers: Optional[dict[str, str]] = None) -> httpx.AsyncClient:
"""Get or create HTTP client with default headers
Args:
extra_headers: 额外的请求头,某些接口需要特殊的 header如 Authorization2
"""
if self._client is None:
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"Device-Type": "pc",
"tenant-Id": self.tenant_id,
"currency-code": self.currency_code,
"language-id": self.language_id,
"language_id": self.language_id, # 某些接口使用下划线
"source": self.source,
"Origin": "https://www.qa1.gaia888.com",
"Referer": "https://www.qa1.gaia888.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"DNT": "1",
}
# 只有在有 token 时才添加 Authorization header
if self.api_token:
default_headers["Authorization"] = f"Bearer {self.api_token}"
# 合并额外的 headers用于 Authorization2 等)
if extra_headers:
default_headers.update(extra_headers)
self._client = httpx.AsyncClient(
base_url=self.api_url,
headers=default_headers,
timeout=30.0
)
return self._client
async def close(self):
"""Close HTTP client"""
if self._client:
await self._client.aclose()
self._client = None
async def request(
self,
method: str,
endpoint: str,
params: Optional[dict[str, Any]] = None,
json: Optional[dict[str, Any]] = None,
headers: Optional[dict[str, str]] = None,
use_authorization2: bool = False
) -> dict[str, Any]:
"""Make API request and handle response
Args:
method: HTTP method
endpoint: API endpoint (e.g., "/mall/api/order/show")
params: Query parameters
json: JSON body
headers: Additional headers
use_authorization2: 是否使用 Authorization2 header 而不是 Authorization
Returns:
Response data
"""
# 如果需要 Authorization2关闭现有 client 并用新的 headers 创建
if use_authorization2:
if self._client:
await self._client.aclose()
self._client = None
extra_headers = {"Authorization2": f"Bearer {self.api_token}"}
client = await self._get_client(extra_headers)
else:
client = await self._get_client()
# Merge additional headers
request_headers = {}
if headers:
request_headers.update(headers)
response = await client.request(
method=method,
url=endpoint,
params=params,
json=json,
headers=request_headers
)
# Debug logging for product search
if "/spu" in endpoint:
print(f"[DEBUG MallClient] Request: {method} {endpoint}")
print(f"[DEBUG MallClient] Params: {params}")
print(f"[DEBUG MallClient] Response URL: {response.url}")
print(f"[DEBUG MallClient] Response Status: {response.status_code}")
response.raise_for_status()
data = response.json()
# Mall API 返回格式: {"code": 200, "msg": "success", "result": {...}}
# 检查 API 错误
if data.get("code") != 200:
raise Exception(f"API Error [{data.get('code')}]: {data.get('msg') or data.get('message')}")
# 返回 result 字段或整个 data
return data.get("result", data)
async def get(
self,
endpoint: str,
params: Optional[dict[str, Any]] = None,
use_authorization2: bool = False,
**kwargs: Any
) -> dict[str, Any]:
"""GET request"""
return await self.request("GET", endpoint, params=params, use_authorization2=use_authorization2, **kwargs)
async def post(
self,
endpoint: str,
json: Optional[dict[str, Any]] = None,
use_authorization2: bool = False,
**kwargs: Any
) -> dict[str, Any]:
"""POST request"""
return await self.request("POST", endpoint, json=json, use_authorization2=use_authorization2, **kwargs)
# ============ Order APIs ============
async def get_order_by_id(
self,
order_id: str
) -> dict[str, Any]:
"""Query order by order ID
根据订单号查询订单详情
Args:
order_id: 订单号 (e.g., "202071324")
Returns:
订单详情,包含订单号、状态、商品信息、金额、物流信息等
Order details including order ID, status, items, amount, logistics info, etc.
Example:
>>> client = MallClient()
>>> order = await client.get_order_by_id("202071324")
>>> print(order["order_id"])
"""
try:
result = await self.get(
"/mall/api/order/show",
params={"orderId": order_id}
)
return result
except Exception as e:
raise Exception(f"查询订单失败 (Query order failed): {str(e)}")
async def get_order_list(
self,
page: int = 1,
limit: int = 5,
customer_id: int = 0,
order_types: Optional[list[int]] = None,
shipping_status: int = 10000,
date_added: Optional[str] = None,
date_end: Optional[str] = None,
no: Optional[str] = None,
status: int = 10000,
is_drop_shopping: int = 0
) -> dict[str, Any]:
"""Query order list with filters
查询订单列表,支持多种筛选条件
Args:
page: 页码 (default: 1)
limit: 每页数量 (default: 5)
customer_id: 客户ID (default: 0)
order_types: 订单类型数组,如 [1, 2] (default: None)
shipping_status: 物流状态 (default: 10000, 10000表示全部状态)
date_added: 开始日期,格式 YYYY-MM-DD (default: None)
date_end: 结束日期,格式 YYYY-MM-DD (default: None)
no: 订单号 (default: None)
status: 订单状态 (default: 10000, 10000表示全部状态)
is_drop_shopping: 是否代发货 (default: 0)
Returns:
订单列表和分页信息
Order list and pagination info
Example:
>>> client = MallClient()
>>> result = await client.get_order_list(page=1, limit=10)
>>> print(result["data"]) # 订单列表
>>> print(result["total"]) # 总数
"""
try:
params = {
"page": page,
"limit": limit,
"customerId": customer_id,
"shippingStatus": shipping_status,
"status": status,
"isDropShopping": is_drop_shopping
}
# 可选参数
if order_types:
# orderTypes 是数组,需要特殊处理
# API 格式: orderTypes[]=1&orderTypes[]=2
params["orderTypes"] = order_types
if date_added:
params["dateAdded"] = date_added
if date_end:
params["dateEnd"] = date_end
if no:
params["no"] = no
result = await self.get(
"/mall/api/order/list",
params=params,
use_authorization2=True # 订单列表接口需要 Authorization2
)
return result
except Exception as e:
raise Exception(f"查询订单列表失败 (Query order list failed): {str(e)}")
# ============ Product APIs ============
async def search_spu_products(
self,
keyword: str,
page_size: int = 5,
page: int = 1
) -> dict[str, Any]:
"""Search SPU products by keyword
根据关键词搜索商品 SPU
Args:
keyword: 搜索关键词(商品名称、编号等)
page_size: 每页数量 (default: 5, max 100)
page: 页码 (default: 1)
Returns:
商品列表,包含 SPU 信息、商品图片、价格等
Product list including SPU info, images, prices, etc.
Example:
>>> client = MallClient()
>>> result = await client.search_spu_products("61607", page_size=60, page=1)
>>> print(f"找到 {len(result.get('list', []))} 个商品")
"""
try:
params = {
"pageSize": min(page_size, 100), # 限制最大 100
"page": page,
"keyword": keyword
}
result = await self.get(
"/mall/api/spu",
params=params
)
return result
except Exception as e:
raise Exception(f"搜索商品失败 (Search SPU products failed): {str(e)}")
async def get_love_list(
self,
page: int = 1,
page_size: int = 6,
warehouse_id: int = 2
) -> dict[str, Any]:
"""Get recommended products (Love List) from Mall API
获取推荐商品列表
Args:
page: Page number (default: 1)
page_size: Number of products per page (default: 6)
warehouse_id: Warehouse ID (default: 2)
Returns:
Dictionary containing product list and metadata
Example:
>>> client = MallClient()
>>> result = await client.get_love_list(page=1, page_size=6, warehouse_id=2)
>>> print(f"找到 {len(result.get('list', []))} 个推荐商品")
"""
try:
params = {
"page": page,
"pageSize": page_size,
"warehouseId": warehouse_id
}
result = await self.get(
"/mall/api/loveList",
params=params
)
return result
except Exception as e:
raise Exception(f"获取推荐商品失败 (Get love list failed): {str(e)}")
# Global Mall client instance
mall_client: Optional[MallClient] = None
def get_mall_client() -> MallClient:
"""Get or create global Mall client instance"""
global mall_client
if mall_client is None:
mall_client = MallClient()
return mall_client