Files

368 lines
12 KiB
Python
Raw Permalink Normal View History

"""
Mall API Client for MCP Servers
用于调用商城 API包括订单查询等接口
"""
from typing import Any, Optional
import httpx
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class MallSettings(BaseSettings):
"""Mall API configuration"""
mall_api_url: Optional[str] = None
mall_api_token: Optional[str] = None
mall_tenant_id: str = "2"
mall_currency_code: str = "EUR"
mall_language_id: str = "1"
mall_source: str = "us.qa1.gaia888.com"
model_config = ConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore"
)
settings = MallSettings()
class MallClient:
"""Async client for Mall API"""
def __init__(
self,
api_url: Optional[str] = None,
api_token: Optional[str] = None,
tenant_id: Optional[str] = None,
currency_code: Optional[str] = None,
language_id: Optional[str] = None,
source: Optional[str] = None
):
self.api_url = (api_url or settings.mall_api_url or "").rstrip("/")
self.api_token = api_token or settings.mall_api_token or ""
self.tenant_id = tenant_id or settings.mall_tenant_id
self.currency_code = currency_code or settings.mall_currency_code
self.language_id = language_id or settings.mall_language_id
self.source = source or settings.mall_source
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self, extra_headers: Optional[dict[str, str]] = None) -> httpx.AsyncClient:
"""Get or create HTTP client with default headers
Args:
extra_headers: 额外的请求头某些接口需要特殊的 header Authorization2
"""
if self._client is None:
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"Device-Type": "pc",
"tenant-Id": self.tenant_id,
"currency-code": self.currency_code,
"language-id": self.language_id,
"language_id": self.language_id, # 某些接口使用下划线
"source": self.source,
"Origin": "https://www.qa1.gaia888.com",
"Referer": "https://www.qa1.gaia888.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"DNT": "1",
}
# 只有在有 token 时才添加 Authorization header
if self.api_token:
default_headers["Authorization"] = f"Bearer {self.api_token}"
# 合并额外的 headers用于 Authorization2 等)
if extra_headers:
default_headers.update(extra_headers)
self._client = httpx.AsyncClient(
base_url=self.api_url,
headers=default_headers,
timeout=30.0
)
return self._client
async def close(self):
"""Close HTTP client"""
if self._client:
await self._client.aclose()
self._client = None
async def request(
self,
method: str,
endpoint: str,
params: Optional[dict[str, Any]] = None,
json: Optional[dict[str, Any]] = None,
headers: Optional[dict[str, str]] = None,
use_authorization2: bool = False
) -> dict[str, Any]:
"""Make API request and handle response
Args:
method: HTTP method
endpoint: API endpoint (e.g., "/mall/api/order/show")
params: Query parameters
json: JSON body
headers: Additional headers
use_authorization2: 是否使用 Authorization2 header 而不是 Authorization
Returns:
Response data
"""
# 如果需要 Authorization2关闭现有 client 并用新的 headers 创建
if use_authorization2:
if self._client:
await self._client.aclose()
self._client = None
extra_headers = {"Authorization2": f"Bearer {self.api_token}"}
client = await self._get_client(extra_headers)
else:
client = await self._get_client()
# Merge additional headers
request_headers = {}
if headers:
request_headers.update(headers)
response = await client.request(
method=method,
url=endpoint,
params=params,
json=json,
headers=request_headers
)
# Debug logging for product search
if "/spu" in endpoint:
print(f"[DEBUG MallClient] Request: {method} {endpoint}")
print(f"[DEBUG MallClient] Params: {params}")
print(f"[DEBUG MallClient] Response URL: {response.url}")
print(f"[DEBUG MallClient] Response Status: {response.status_code}")
response.raise_for_status()
data = response.json()
# Mall API 返回格式: {"code": 200, "msg": "success", "result": {...}}
# 检查 API 错误
if data.get("code") != 200:
raise Exception(f"API Error [{data.get('code')}]: {data.get('msg') or data.get('message')}")
# 返回 result 字段或整个 data
return data.get("result", data)
async def get(
self,
endpoint: str,
params: Optional[dict[str, Any]] = None,
use_authorization2: bool = False,
**kwargs: Any
) -> dict[str, Any]:
"""GET request"""
return await self.request("GET", endpoint, params=params, use_authorization2=use_authorization2, **kwargs)
async def post(
self,
endpoint: str,
json: Optional[dict[str, Any]] = None,
use_authorization2: bool = False,
**kwargs: Any
) -> dict[str, Any]:
"""POST request"""
return await self.request("POST", endpoint, json=json, use_authorization2=use_authorization2, **kwargs)
# ============ Order APIs ============
async def get_order_by_id(
self,
order_id: str
) -> dict[str, Any]:
"""Query order by order ID
根据订单号查询订单详情
Args:
order_id: 订单号 (e.g., "202071324")
Returns:
订单详情包含订单号状态商品信息金额物流信息等
Order details including order ID, status, items, amount, logistics info, etc.
Example:
>>> client = MallClient()
>>> order = await client.get_order_by_id("202071324")
>>> print(order["order_id"])
"""
try:
result = await self.get(
"/mall/api/order/show",
params={"orderId": order_id}
)
return result
except Exception as e:
raise Exception(f"查询订单失败 (Query order failed): {str(e)}")
async def get_order_list(
self,
page: int = 1,
limit: int = 5,
customer_id: int = 0,
order_types: Optional[list[int]] = None,
shipping_status: int = 10000,
date_added: Optional[str] = None,
date_end: Optional[str] = None,
no: Optional[str] = None,
status: int = 10000,
is_drop_shopping: int = 0
) -> dict[str, Any]:
"""Query order list with filters
查询订单列表支持多种筛选条件
Args:
page: 页码 (default: 1)
limit: 每页数量 (default: 5)
customer_id: 客户ID (default: 0)
order_types: 订单类型数组 [1, 2] (default: None)
shipping_status: 物流状态 (default: 10000, 10000表示全部状态)
date_added: 开始日期格式 YYYY-MM-DD (default: None)
date_end: 结束日期格式 YYYY-MM-DD (default: None)
no: 订单号 (default: None)
status: 订单状态 (default: 10000, 10000表示全部状态)
is_drop_shopping: 是否代发货 (default: 0)
Returns:
订单列表和分页信息
Order list and pagination info
Example:
>>> client = MallClient()
>>> result = await client.get_order_list(page=1, limit=10)
>>> print(result["data"]) # 订单列表
>>> print(result["total"]) # 总数
"""
try:
params = {
"page": page,
"limit": limit,
"customerId": customer_id,
"shippingStatus": shipping_status,
"status": status,
"isDropShopping": is_drop_shopping
}
# 可选参数
if order_types:
# orderTypes 是数组,需要特殊处理
# API 格式: orderTypes[]=1&orderTypes[]=2
params["orderTypes"] = order_types
if date_added:
params["dateAdded"] = date_added
if date_end:
params["dateEnd"] = date_end
if no:
params["no"] = no
result = await self.get(
"/mall/api/order/list",
params=params,
use_authorization2=True # 订单列表接口需要 Authorization2
)
return result
except Exception as e:
raise Exception(f"查询订单列表失败 (Query order list failed): {str(e)}")
# ============ Product APIs ============
async def search_spu_products(
self,
keyword: str,
page_size: int = 5,
page: int = 1
) -> dict[str, Any]:
"""Search SPU products by keyword
根据关键词搜索商品 SPU
Args:
keyword: 搜索关键词商品名称编号等
page_size: 每页数量 (default: 5, max 100)
page: 页码 (default: 1)
Returns:
商品列表包含 SPU 信息商品图片价格等
Product list including SPU info, images, prices, etc.
Example:
>>> client = MallClient()
>>> result = await client.search_spu_products("61607", page_size=60, page=1)
>>> print(f"找到 {len(result.get('list', []))} 个商品")
"""
try:
params = {
"pageSize": min(page_size, 100), # 限制最大 100
"page": page,
"keyword": keyword
}
result = await self.get(
"/mall/api/spu",
params=params
)
return result
except Exception as e:
raise Exception(f"搜索商品失败 (Search SPU products failed): {str(e)}")
async def get_love_list(
self,
page: int = 1,
page_size: int = 6,
warehouse_id: int = 2
) -> dict[str, Any]:
"""Get recommended products (Love List) from Mall API
获取推荐商品列表
Args:
page: Page number (default: 1)
page_size: Number of products per page (default: 6)
warehouse_id: Warehouse ID (default: 2)
Returns:
Dictionary containing product list and metadata
Example:
>>> client = MallClient()
>>> result = await client.get_love_list(page=1, page_size=6, warehouse_id=2)
>>> print(f"找到 {len(result.get('list', []))} 个推荐商品")
"""
try:
params = {
"page": page,
"pageSize": page_size,
"warehouseId": warehouse_id
}
result = await self.get(
"/mall/api/loveList",
params=params
)
return result
except Exception as e:
raise Exception(f"获取推荐商品失败 (Get love list failed): {str(e)}")
# Global Mall client instance
mall_client: Optional[MallClient] = None
def get_mall_client() -> MallClient:
"""Get or create global Mall client instance"""
global mall_client
if mall_client is None:
mall_client = MallClient()
return mall_client