主要改进: - Agent 增强: 订单查询、售后支持、客服路由等功能优化 - 新增语言检测和 Token 管理模块 - 改进 Chatwoot webhook 处理和用户标识 - MCP 服务器增强: 订单 MCP 和 Strapi MCP 功能扩展 - 新增商城客户端、知识库、缓存和同步模块 - 添加多语言提示词系统 (YAML) - 完善项目结构: 整理文档、脚本和测试文件 - 新增调试和测试工具脚本 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
104 lines
3.3 KiB
Python
104 lines
3.3 KiB
Python
"""
|
||
测试商城订单查询接口
|
||
|
||
Usage:
|
||
python test_mall_order_query.py <order_id>
|
||
|
||
Example:
|
||
python test_mall_order_query.py 202071324
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
import os
|
||
|
||
# Add mcp_servers to path
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "mcp_servers"))
|
||
|
||
from shared.mall_client import MallClient
|
||
|
||
|
||
async def test_order_query(order_id: str, token: str):
|
||
"""测试订单查询
|
||
|
||
Args:
|
||
order_id: 订单号
|
||
token: JWT Token
|
||
"""
|
||
print(f"\n{'='*60}")
|
||
print(f"测试商城订单查询接口")
|
||
print(f"{'='*60}")
|
||
print(f"订单号 (Order ID): {order_id}")
|
||
print(f"API URL: https://apicn.qa1.gaia888.com")
|
||
print(f"{'='*60}\n")
|
||
|
||
# 创建客户端
|
||
client = MallClient(
|
||
api_url="https://apicn.qa1.gaia888.com",
|
||
api_token=token,
|
||
tenant_id="2",
|
||
currency_code="EUR",
|
||
language_id="1",
|
||
source="us.qa1.gaia888.com"
|
||
)
|
||
|
||
try:
|
||
# 调用订单查询接口
|
||
result = await client.get_order_by_id(order_id)
|
||
|
||
# 打印结果
|
||
print("✅ 查询成功 (Query Success)!")
|
||
print(f"\n返回数据 (Response Data):")
|
||
print("-" * 60)
|
||
import json
|
||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||
print("-" * 60)
|
||
|
||
# 提取关键信息
|
||
if isinstance(result, dict):
|
||
print(f"\n关键信息 (Key Information):")
|
||
print(f" 订单号 (Order ID): {result.get('order_id') or result.get('orderId') or order_id}")
|
||
print(f" 订单状态 (Status): {result.get('status') or result.get('order_status') or 'N/A'}")
|
||
print(f" 订单金额 (Amount): {result.get('total_amount') or result.get('amount') or 'N/A'}")
|
||
|
||
# 商品信息
|
||
items = result.get('items') or result.get('order_items') or result.get('products')
|
||
if items:
|
||
print(f" 商品数量 (Items): {len(items)}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 查询失败 (Query Failed): {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
finally:
|
||
await client.close()
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
if len(sys.argv) < 2:
|
||
print("Usage: python test_mall_order_query.py <order_id> [token]")
|
||
print("\nExample:")
|
||
print(' python test_mall_order_query.py 202071324')
|
||
print(' python test_mall_order_query.py 202071324 "your_jwt_token_here"')
|
||
sys.exit(1)
|
||
|
||
order_id = sys.argv[1]
|
||
|
||
# 从命令行获取 token,如果没有提供则使用默认的测试 token
|
||
if len(sys.argv) >= 3:
|
||
token = sys.argv[2]
|
||
else:
|
||
# 使用用户提供的示例 token
|
||
token = "eyJ0eXAiOiJqd3QifQ.eyJzdWIiOiIxIiwiaXNzIjoiaHR0cDpcL1wvOiIsImV4cCI6MTc3MDUyMDY2MSwiaWF0IjoxNzY3OTI4NjYxLCJuYmYiOjE3Njc5Mjg2NjEsInVzZXJJZCI6MTAxNDMyLCJ0eXBlIjoyLCJ0ZW5hbnRJZCI6MiwidWlkIjoxMDE0MzIsInMiOiJkM0tZMjMiLCJqdGkiOiI3YjcwYTI2MzYwYjJmMzA3YmQ4YTYzNDAxOGVlNjlmZSJ9.dwiqln19-yAQSJd1w5bxZFrRgyohdAkHa1zW3W7Ov2I"
|
||
print("⚠️ 使用默认的测试 token(可能已过期)")
|
||
print(" 如需测试,请提供有效的 token:")
|
||
print(f' python {sys.argv[0]} {order_id} "your_jwt_token_here"\n')
|
||
|
||
# 运行异步测试
|
||
asyncio.run(test_order_query(order_id, token))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|