## 主要修复 ### 1. JSON 解析错误处理 - 修复所有 Agent 的 LLM 响应解析失败时返回原始内容的问题 - 当 JSON 解析失败时,返回友好的兜底消息而不是原始文本 - 影响文件: customer_service.py, order.py, product.py, aftersale.py ### 2. FAQ 快速路径修复 - 修复 customer_service.py 中变量定义顺序问题 - has_faq_query 在使用前未定义导致 NameError - 添加详细的错误日志记录 ### 3. Chatwoot 集成改进 - 添加响应内容调试日志 - 改进错误处理和日志记录 ### 4. 订单查询优化 - 将订单列表默认返回数量从 10 条改为 5 条 - 统一 MCP 工具层和 Mall Client 层的默认值 ### 5. 代码清理 - 删除所有测试代码和示例文件 - 刋试文件包括: test_*.py, test_*.html, test_*.sh - 删除测试目录: tests/, agent/tests/, agent/examples/ Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
330 lines
11 KiB
Python
330 lines
11 KiB
Python
"""
|
||
Mall API Client for MCP Servers
|
||
用于调用商城 API,包括订单查询等接口
|
||
"""
|
||
from typing import Any, Optional
|
||
import httpx
|
||
from pydantic_settings import BaseSettings
|
||
from pydantic import ConfigDict
|
||
|
||
|
||
class MallSettings(BaseSettings):
|
||
"""Mall API configuration"""
|
||
mall_api_url: Optional[str] = None
|
||
mall_api_token: Optional[str] = None
|
||
mall_tenant_id: str = "2"
|
||
mall_currency_code: str = "EUR"
|
||
mall_language_id: str = "1"
|
||
mall_source: str = "us.qa1.gaia888.com"
|
||
|
||
model_config = ConfigDict(
|
||
env_file=".env",
|
||
env_file_encoding="utf-8",
|
||
extra="ignore"
|
||
)
|
||
|
||
|
||
settings = MallSettings()
|
||
|
||
|
||
class MallClient:
|
||
"""Async client for Mall API"""
|
||
|
||
def __init__(
|
||
self,
|
||
api_url: Optional[str] = None,
|
||
api_token: Optional[str] = None,
|
||
tenant_id: Optional[str] = None,
|
||
currency_code: Optional[str] = None,
|
||
language_id: Optional[str] = None,
|
||
source: Optional[str] = None
|
||
):
|
||
self.api_url = (api_url or settings.mall_api_url or "").rstrip("/")
|
||
self.api_token = api_token or settings.mall_api_token or ""
|
||
self.tenant_id = tenant_id or settings.mall_tenant_id
|
||
self.currency_code = currency_code or settings.mall_currency_code
|
||
self.language_id = language_id or settings.mall_language_id
|
||
self.source = source or settings.mall_source
|
||
self._client: Optional[httpx.AsyncClient] = None
|
||
|
||
async def _get_client(self, extra_headers: Optional[dict[str, str]] = None) -> httpx.AsyncClient:
|
||
"""Get or create HTTP client with default headers
|
||
|
||
Args:
|
||
extra_headers: 额外的请求头,某些接口需要特殊的 header(如 Authorization2)
|
||
"""
|
||
if self._client is None:
|
||
default_headers = {
|
||
"Content-Type": "application/json",
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Device-Type": "pc",
|
||
"tenant-Id": self.tenant_id,
|
||
"currency-code": self.currency_code,
|
||
"language-id": self.language_id,
|
||
"language_id": self.language_id, # 某些接口使用下划线
|
||
"source": self.source,
|
||
"Origin": "https://www.qa1.gaia888.com",
|
||
"Referer": "https://www.qa1.gaia888.com/",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
|
||
"DNT": "1",
|
||
}
|
||
|
||
# 只有在有 token 时才添加 Authorization header
|
||
if self.api_token:
|
||
default_headers["Authorization"] = f"Bearer {self.api_token}"
|
||
|
||
# 合并额外的 headers(用于 Authorization2 等)
|
||
if extra_headers:
|
||
default_headers.update(extra_headers)
|
||
|
||
self._client = httpx.AsyncClient(
|
||
base_url=self.api_url,
|
||
headers=default_headers,
|
||
timeout=30.0
|
||
)
|
||
return self._client
|
||
|
||
async def close(self):
|
||
"""Close HTTP client"""
|
||
if self._client:
|
||
await self._client.aclose()
|
||
self._client = None
|
||
|
||
async def request(
|
||
self,
|
||
method: str,
|
||
endpoint: str,
|
||
params: Optional[dict[str, Any]] = None,
|
||
json: Optional[dict[str, Any]] = None,
|
||
headers: Optional[dict[str, str]] = None,
|
||
use_authorization2: bool = False
|
||
) -> dict[str, Any]:
|
||
"""Make API request and handle response
|
||
|
||
Args:
|
||
method: HTTP method
|
||
endpoint: API endpoint (e.g., "/mall/api/order/show")
|
||
params: Query parameters
|
||
json: JSON body
|
||
headers: Additional headers
|
||
use_authorization2: 是否使用 Authorization2 header 而不是 Authorization
|
||
|
||
Returns:
|
||
Response data
|
||
"""
|
||
# 如果需要 Authorization2,关闭现有 client 并用新的 headers 创建
|
||
if use_authorization2:
|
||
if self._client:
|
||
await self._client.aclose()
|
||
self._client = None
|
||
extra_headers = {"Authorization2": f"Bearer {self.api_token}"}
|
||
client = await self._get_client(extra_headers)
|
||
else:
|
||
client = await self._get_client()
|
||
|
||
# Merge additional headers
|
||
request_headers = {}
|
||
if headers:
|
||
request_headers.update(headers)
|
||
|
||
response = await client.request(
|
||
method=method,
|
||
url=endpoint,
|
||
params=params,
|
||
json=json,
|
||
headers=request_headers
|
||
)
|
||
|
||
# Debug logging for product search
|
||
if "/spu" in endpoint:
|
||
print(f"[DEBUG MallClient] Request: {method} {endpoint}")
|
||
print(f"[DEBUG MallClient] Params: {params}")
|
||
print(f"[DEBUG MallClient] Response URL: {response.url}")
|
||
print(f"[DEBUG MallClient] Response Status: {response.status_code}")
|
||
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
|
||
# Mall API 返回格式: {"code": 200, "msg": "success", "result": {...}}
|
||
# 检查 API 错误
|
||
if data.get("code") != 200:
|
||
raise Exception(f"API Error [{data.get('code')}]: {data.get('msg') or data.get('message')}")
|
||
|
||
# 返回 result 字段或整个 data
|
||
return data.get("result", data)
|
||
|
||
async def get(
|
||
self,
|
||
endpoint: str,
|
||
params: Optional[dict[str, Any]] = None,
|
||
use_authorization2: bool = False,
|
||
**kwargs: Any
|
||
) -> dict[str, Any]:
|
||
"""GET request"""
|
||
return await self.request("GET", endpoint, params=params, use_authorization2=use_authorization2, **kwargs)
|
||
|
||
async def post(
|
||
self,
|
||
endpoint: str,
|
||
json: Optional[dict[str, Any]] = None,
|
||
use_authorization2: bool = False,
|
||
**kwargs: Any
|
||
) -> dict[str, Any]:
|
||
"""POST request"""
|
||
return await self.request("POST", endpoint, json=json, use_authorization2=use_authorization2, **kwargs)
|
||
|
||
# ============ Order APIs ============
|
||
|
||
async def get_order_by_id(
|
||
self,
|
||
order_id: str
|
||
) -> dict[str, Any]:
|
||
"""Query order by order ID
|
||
|
||
根据订单号查询订单详情
|
||
|
||
Args:
|
||
order_id: 订单号 (e.g., "202071324")
|
||
|
||
Returns:
|
||
订单详情,包含订单号、状态、商品信息、金额、物流信息等
|
||
Order details including order ID, status, items, amount, logistics info, etc.
|
||
|
||
Example:
|
||
>>> client = MallClient()
|
||
>>> order = await client.get_order_by_id("202071324")
|
||
>>> print(order["order_id"])
|
||
"""
|
||
try:
|
||
result = await self.get(
|
||
"/mall/api/order/show",
|
||
params={"orderId": order_id}
|
||
)
|
||
return result
|
||
except Exception as e:
|
||
raise Exception(f"查询订单失败 (Query order failed): {str(e)}")
|
||
|
||
async def get_order_list(
|
||
self,
|
||
page: int = 1,
|
||
limit: int = 5,
|
||
customer_id: int = 0,
|
||
order_types: Optional[list[int]] = None,
|
||
shipping_status: int = 10000,
|
||
date_added: Optional[str] = None,
|
||
date_end: Optional[str] = None,
|
||
no: Optional[str] = None,
|
||
status: int = 10000,
|
||
is_drop_shopping: int = 0
|
||
) -> dict[str, Any]:
|
||
"""Query order list with filters
|
||
|
||
查询订单列表,支持多种筛选条件
|
||
|
||
Args:
|
||
page: 页码 (default: 1)
|
||
limit: 每页数量 (default: 5)
|
||
customer_id: 客户ID (default: 0)
|
||
order_types: 订单类型数组,如 [1, 2] (default: None)
|
||
shipping_status: 物流状态 (default: 10000, 10000表示全部状态)
|
||
date_added: 开始日期,格式 YYYY-MM-DD (default: None)
|
||
date_end: 结束日期,格式 YYYY-MM-DD (default: None)
|
||
no: 订单号 (default: None)
|
||
status: 订单状态 (default: 10000, 10000表示全部状态)
|
||
is_drop_shopping: 是否代发货 (default: 0)
|
||
|
||
Returns:
|
||
订单列表和分页信息
|
||
Order list and pagination info
|
||
|
||
Example:
|
||
>>> client = MallClient()
|
||
>>> result = await client.get_order_list(page=1, limit=10)
|
||
>>> print(result["data"]) # 订单列表
|
||
>>> print(result["total"]) # 总数
|
||
"""
|
||
try:
|
||
params = {
|
||
"page": page,
|
||
"limit": limit,
|
||
"customerId": customer_id,
|
||
"shippingStatus": shipping_status,
|
||
"status": status,
|
||
"isDropShopping": is_drop_shopping
|
||
}
|
||
|
||
# 可选参数
|
||
if order_types:
|
||
# orderTypes 是数组,需要特殊处理
|
||
# API 格式: orderTypes[]=1&orderTypes[]=2
|
||
params["orderTypes"] = order_types
|
||
if date_added:
|
||
params["dateAdded"] = date_added
|
||
if date_end:
|
||
params["dateEnd"] = date_end
|
||
if no:
|
||
params["no"] = no
|
||
|
||
result = await self.get(
|
||
"/mall/api/order/list",
|
||
params=params,
|
||
use_authorization2=True # 订单列表接口需要 Authorization2
|
||
)
|
||
return result
|
||
except Exception as e:
|
||
raise Exception(f"查询订单列表失败 (Query order list failed): {str(e)}")
|
||
|
||
# ============ Product APIs ============
|
||
|
||
async def search_spu_products(
|
||
self,
|
||
keyword: str,
|
||
page_size: int = 5,
|
||
page: int = 1
|
||
) -> dict[str, Any]:
|
||
"""Search SPU products by keyword
|
||
|
||
根据关键词搜索商品 SPU
|
||
|
||
Args:
|
||
keyword: 搜索关键词(商品名称、编号等)
|
||
page_size: 每页数量 (default: 5, max 100)
|
||
page: 页码 (default: 1)
|
||
|
||
Returns:
|
||
商品列表,包含 SPU 信息、商品图片、价格等
|
||
Product list including SPU info, images, prices, etc.
|
||
|
||
Example:
|
||
>>> client = MallClient()
|
||
>>> result = await client.search_spu_products("61607", page_size=60, page=1)
|
||
>>> print(f"找到 {len(result.get('list', []))} 个商品")
|
||
"""
|
||
try:
|
||
params = {
|
||
"pageSize": min(page_size, 100), # 限制最大 100
|
||
"page": page,
|
||
"keyword": keyword
|
||
}
|
||
|
||
result = await self.get(
|
||
"/mall/api/spu",
|
||
params=params
|
||
)
|
||
return result
|
||
except Exception as e:
|
||
raise Exception(f"搜索商品失败 (Search SPU products failed): {str(e)}")
|
||
|
||
|
||
# Global Mall client instance
|
||
mall_client: Optional[MallClient] = None
|
||
|
||
|
||
def get_mall_client() -> MallClient:
|
||
"""Get or create global Mall client instance"""
|
||
global mall_client
|
||
if mall_client is None:
|
||
mall_client = MallClient()
|
||
return mall_client
|