Files
assistant/agent/agents/customer_service.py
wangliang 0b5d0a8086 feat: 重构订单和物流信息展示格式
主要改动:
- 订单列表:使用 order_list 格式,展示 5 个订单(全部状态)
- 订单详情:使用 order_detail 格式,优化价格和时间显示
- 物流信息:使用 logistics 格式,根据 track id 动态生成步骤
- 商品图片:从 orderProduct.imageUrl 字段获取
- 时间格式:统一为 YYYY-MM-DD HH:MM:SS
- 多语言支持:amountLabel、orderTime 支持中英文
- 配置管理:新增 FRONTEND_URL 环境变量
- API 集成:改进 Mall API tracks 数据解析
- 认证优化:account_id 从 webhook 动态获取

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-23 18:49:40 +08:00

313 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Customer Service Agent - Handles FAQ and general inquiries
"""
import json
from typing import Any
from core.state import AgentState, ConversationState, add_tool_call, set_response
from core.llm import get_llm_client, Message
from prompts import get_prompt
from utils.logger import get_logger
from utils.faq_library import get_faq_library
logger = get_logger(__name__)
async def customer_service_agent(state: AgentState) -> AgentState:
"""Customer service agent node
Handles FAQ, company info, and general inquiries using Strapi MCP tools.
Args:
state: Current agent state
Returns:
Updated state with tool calls or response
"""
logger.info(
"Customer service agent processing",
conversation_id=state["conversation_id"]
)
state["current_agent"] = "customer_service"
state["agent_history"].append("customer_service")
state["state"] = ConversationState.PROCESSING.value
# Check if we have tool results to process
if state["tool_results"]:
return await _generate_response_from_results(state)
# ========== FAST PATH: Check if FAQ was already matched at router ==========
# Router already checked FAQ and stored response if found
if "faq_response" in state and state["faq_response"]:
logger.info(
"Using FAQ response from router",
conversation_id=state["conversation_id"],
response_length=len(state["faq_response"])
)
return set_response(state, state["faq_response"])
# =========================================================================
# ========== FAST PATH: Check local FAQ library first (backup) ==========
# This provides instant response for common questions without API calls
# This is a fallback in case FAQ wasn't matched at router level
faq_library = get_faq_library()
faq_response = faq_library.find_match(state["current_message"])
if faq_response:
logger.info(
"FAQ match found, returning instant response",
conversation_id=state["conversation_id"],
response_length=len(faq_response)
)
return set_response(state, faq_response)
# ============================================================
# Get detected language
locale = state.get("detected_language", "en")
# Auto-detect category and query FAQ
message_lower = state["current_message"].lower()
# 定义分类关键词支持多语言en, nl, de, es, fr, it, tr, zh
category_keywords = {
"register": [
# English
"register", "sign up", "account", "login", "password", "forgot",
# Dutch (Nederlands)
"registreren", "account", "inloggen", "wachtwoord",
# German (Deutsch)
"registrieren", "konto", "anmelden", "passwort",
# Spanish (Español)
"registrar", "cuenta", "iniciar", "contraseña",
# French (Français)
"enregistrer", "compte", "connecter", "mot de passe",
# Italian (Italiano)
"registrarsi", "account", "accesso", "password",
# Turkish (Türkçe)
"kayıt", "hesap", "giriş", "şifre",
# Chinese (中文)
"注册", "账号", "登录", "密码", "忘记密码"
],
"order": [
# English
"order", "place order", "cancel order", "modify order", "change order",
# Dutch
"bestelling", "bestellen", "annuleren", "wijzigen",
# German
"bestellung", "bestellen", "stornieren", "ändern",
# Spanish
"pedido", "hacer pedido", "cancelar", "modificar",
# French
"commande", "passer commande", "annuler", "modifier",
# Italian
"ordine", "ordinare", "cancellare", "modificare",
# Turkish
"sipariş", "sipariş ver", "iptal", "değiştir",
# Chinese
"订单", "下单", "取消订单", "修改订单", "更改订单"
],
"payment": [
# English
"pay", "payment", "checkout", "voucher", "discount", "promo",
# Dutch
"betalen", "betaling", "korting", "voucher",
# German
"bezahlen", "zahlung", "rabatt", "gutschein",
# Spanish
"pagar", "pago", "descuento", "cupón",
# French
"payer", "paiement", "réduction", "bon",
# Italian
"pagare", "pagamento", "sconto", "voucher",
# Turkish
"ödemek", "ödeme", "indirim", "kupon",
# Chinese
"支付", "付款", "结算", "优惠券", "折扣", "促销"
],
"shipment": [
# English
"ship", "shipping", "delivery", "courier", "transit", "logistics", "tracking",
# Dutch
"verzenden", "levering", "koerier", "logistiek", "volgen",
# German
"versand", "lieferung", "kurier", "logistik", "verfolgung",
# Spanish
"enviar", "envío", "entrega", "mensajería", "logística", "seguimiento",
# French
"expédier", "livraison", "coursier", "logistique", "suivi",
# Italian
"spedire", "spedizione", "consegna", "corriere", "logistica", "tracciamento",
# Turkish
"gönderi", "teslimat", "kurye", "lojistik", "takip",
# Chinese
"发货", "配送", "快递", "物流", "运输", "配送单"
],
"return": [
# English
"return", "refund", "exchange", "defective", "damaged",
# Dutch
"retour", "terugbetaling", "ruilen", "defect",
# German
"rückgabe", "erstattung", "austausch", "defekt",
# Spanish
"devolución", "reembolso", "cambio", "defectuoso",
# French
"retour", "remboursement", "échange", "défectueux",
# Italian
"reso", "rimborso", "cambio", "difettoso",
# Turkish
"iade", "geri ödeme", "değişim", "defekt",
# Chinese
"退货", "退款", "换货", "有缺陷", "损坏"
],
}
# 检测分类
detected_category = None
for category, keywords in category_keywords.items():
if any(keyword in message_lower for keyword in keywords):
detected_category = category
break
# 检查是否已经查询过 FAQ
tool_calls = state.get("tool_calls", [])
has_faq_query = any(tc.get("tool_name") in ["query_faq", "search_knowledge_base"] for tc in tool_calls)
# 如果检测到分类且未查询过 FAQ自动查询
if detected_category and not has_faq_query:
logger.info(
f"Auto-querying FAQ for category: {detected_category}",
conversation_id=state["conversation_id"]
)
# 自动添加 FAQ 工具调用
state = add_tool_call(
state,
tool_name="query_faq",
arguments={
"category": detected_category,
"locale": locale,
"limit": 5
},
server="strapi"
)
state["state"] = ConversationState.TOOL_CALLING.value
return state
# 如果询问营业时间或联系方式,自动查询公司信息
if any(keyword in message_lower for keyword in ["opening hour", "contact", "address", "phone", "email"]) and not has_faq_query:
logger.info(
"Auto-querying company info",
conversation_id=state["conversation_id"]
)
state = add_tool_call(
state,
tool_name="get_company_info",
arguments={
"section": "contact",
"locale": locale
},
server="strapi"
)
state["state"] = ConversationState.TOOL_CALLING.value
return state
# Build messages for LLM
# Load prompt in detected language
system_prompt = get_prompt("customer_service", locale)
messages = [
Message(role="system", content=system_prompt),
]
# Add conversation history
for msg in state["messages"][-6:]:
messages.append(Message(role=msg["role"], content=msg["content"]))
# Add current message
messages.append(Message(role="user", content=state["current_message"]))
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
# Parse response
content = response.content.strip()
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
result = json.loads(content)
action = result.get("action")
if action == "call_tool":
# Add tool call to state
state = add_tool_call(
state,
tool_name=result["tool_name"],
arguments=result.get("arguments", {}),
server="strapi"
)
state["state"] = ConversationState.TOOL_CALLING.value
elif action == "respond":
state = set_response(state, result["response"])
state["state"] = ConversationState.GENERATING.value
elif action == "handoff":
state["requires_human"] = True
state["handoff_reason"] = result.get("reason", "User request")
return state
except json.JSONDecodeError:
# LLM returned plain text, use as response
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Customer service agent failed", error=str(e))
state["error"] = str(e)
return state
async def _generate_response_from_results(state: AgentState) -> AgentState:
"""Generate response based on tool results"""
# Build context from tool results
tool_context = []
for result in state["tool_results"]:
if result["success"]:
tool_context.append(f"Tool {result['tool_name']} returned:\n{json.dumps(result['data'], ensure_ascii=False, indent=2)}")
else:
tool_context.append(f"Tool {result['tool_name']} failed: {result['error']}")
prompt = f"""Based on the following tool returned information, generate a response to the user.
User question: {state["current_message"]}
Tool returned information:
{chr(10).join(tool_context)}
Please generate a friendly and professional response. If the tool did not return useful information, honestly inform the user and suggest other ways to get help.
Return only the response content, do not return JSON."""
messages = [
Message(role="system", content="You are a professional B2B customer service assistant, please answer user questions based on tool returned information."),
Message(role="user", content=prompt)
]
try:
llm = get_llm_client()
response = await llm.chat(messages, temperature=0.7)
state = set_response(state, response.content)
return state
except Exception as e:
logger.error("Response generation failed", error=str(e))
state = set_response(state, "Sorry, there was a problem processing your request. Please try again later or contact customer support.")
return state