- 配置 Docker Compose 多服务编排 - 实现 Chatwoot + Agent 集成 - 配置 Strapi MCP 知识库 - 支持 7 种语言的 FAQ 系统 - 实现 LangGraph AI 工作流 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
206 lines
5.1 KiB
Python
206 lines
5.1 KiB
Python
"""
|
|
B2B Shopping AI Assistant - Main Application Entry
|
|
"""
|
|
from contextlib import asynccontextmanager
|
|
from typing import Any
|
|
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
|
|
from config import settings
|
|
from webhooks.chatwoot_webhook import router as webhook_router
|
|
from utils.logger import setup_logging, get_logger
|
|
from utils.cache import get_cache_manager
|
|
from integrations.chatwoot import get_chatwoot_client
|
|
|
|
|
|
# Setup logging
|
|
setup_logging(settings.log_level)
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
# ============ Lifespan Management ============
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan manager"""
|
|
# Startup
|
|
logger.info("Starting B2B Shopping AI Assistant")
|
|
|
|
# Initialize cache connection
|
|
cache = get_cache_manager()
|
|
await cache.connect()
|
|
logger.info("Redis cache connected")
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
logger.info("Shutting down B2B Shopping AI Assistant")
|
|
|
|
# Close connections
|
|
await cache.disconnect()
|
|
|
|
chatwoot = get_chatwoot_client()
|
|
await chatwoot.close()
|
|
|
|
logger.info("Connections closed")
|
|
|
|
|
|
# ============ Application Setup ============
|
|
|
|
app = FastAPI(
|
|
title="B2B Shopping AI Assistant",
|
|
description="AI-powered customer service assistant with LangGraph and MCP",
|
|
version="1.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Configure appropriately for production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# ============ Exception Handlers ============
|
|
|
|
@app.exception_handler(Exception)
|
|
async def global_exception_handler(request: Request, exc: Exception):
|
|
"""Global exception handler"""
|
|
logger.error(
|
|
"Unhandled exception",
|
|
path=request.url.path,
|
|
error=str(exc)
|
|
)
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={"detail": "Internal server error"}
|
|
)
|
|
|
|
|
|
# ============ Include Routers ============
|
|
|
|
app.include_router(webhook_router)
|
|
|
|
|
|
# ============ API Endpoints ============
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint"""
|
|
return {
|
|
"status": "healthy",
|
|
"service": "b2b-ai-assistant",
|
|
"version": "1.0.0"
|
|
}
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
"""Root endpoint"""
|
|
return {
|
|
"name": "B2B Shopping AI Assistant",
|
|
"version": "1.0.0",
|
|
"status": "running"
|
|
}
|
|
|
|
|
|
class QueryRequest(BaseModel):
|
|
"""Direct query request model"""
|
|
conversation_id: str
|
|
user_id: str
|
|
account_id: str
|
|
message: str
|
|
|
|
|
|
class QueryResponse(BaseModel):
|
|
"""Query response model"""
|
|
response: str
|
|
intent: str | None = None
|
|
requires_human: bool = False
|
|
context: dict[str, Any] = {}
|
|
|
|
|
|
@app.post("/api/agent/query", response_model=QueryResponse)
|
|
async def agent_query(request: QueryRequest):
|
|
"""Direct agent query endpoint
|
|
|
|
Allows direct testing of the agent without Chatwoot integration.
|
|
"""
|
|
from core.graph import process_message
|
|
|
|
logger.info(
|
|
"Direct query received",
|
|
conversation_id=request.conversation_id,
|
|
user_id=request.user_id
|
|
)
|
|
|
|
# Load context from cache
|
|
cache = get_cache_manager()
|
|
context = await cache.get_context(request.conversation_id)
|
|
history = await cache.get_messages(request.conversation_id)
|
|
|
|
# Process message
|
|
final_state = await process_message(
|
|
conversation_id=request.conversation_id,
|
|
user_id=request.user_id,
|
|
account_id=request.account_id,
|
|
message=request.message,
|
|
history=history,
|
|
context=context
|
|
)
|
|
|
|
# Update cache
|
|
await cache.add_message(request.conversation_id, "user", request.message)
|
|
if final_state.get("response"):
|
|
await cache.add_message(
|
|
request.conversation_id,
|
|
"assistant",
|
|
final_state["response"]
|
|
)
|
|
|
|
# Save context
|
|
new_context = final_state.get("context", {})
|
|
new_context["last_intent"] = final_state.get("intent")
|
|
await cache.set_context(request.conversation_id, new_context)
|
|
|
|
return QueryResponse(
|
|
response=final_state.get("response", ""),
|
|
intent=final_state.get("intent"),
|
|
requires_human=final_state.get("requires_human", False),
|
|
context=final_state.get("context", {})
|
|
)
|
|
|
|
|
|
@app.get("/api/config")
|
|
async def get_config():
|
|
"""Get sanitized configuration"""
|
|
return {
|
|
"zhipu_model": settings.zhipu_model,
|
|
"max_conversation_steps": settings.max_conversation_steps,
|
|
"conversation_timeout": settings.conversation_timeout,
|
|
"mcp_servers": {
|
|
"strapi": settings.strapi_mcp_url,
|
|
"order": settings.order_mcp_url,
|
|
"aftersale": settings.aftersale_mcp_url,
|
|
"product": settings.product_mcp_url
|
|
}
|
|
}
|
|
|
|
|
|
# ============ Run Application ============
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
reload=True
|
|
)
|