Files

206 lines
5.1 KiB
Python
Raw Permalink Normal View History

"""
B2B Shopping AI Assistant - Main Application Entry
"""
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from config import settings
from webhooks.chatwoot_webhook import router as webhook_router
from utils.logger import setup_logging, get_logger
from utils.cache import get_cache_manager
from integrations.chatwoot import get_chatwoot_client
# Setup logging
setup_logging(settings.log_level)
logger = get_logger(__name__)
# ============ Lifespan Management ============
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
# Startup
logger.info("Starting B2B Shopping AI Assistant")
# Initialize cache connection
cache = get_cache_manager()
await cache.connect()
logger.info("Redis cache connected")
yield
# Shutdown
logger.info("Shutting down B2B Shopping AI Assistant")
# Close connections
await cache.disconnect()
chatwoot = get_chatwoot_client()
await chatwoot.close()
logger.info("Connections closed")
# ============ Application Setup ============
app = FastAPI(
title="B2B Shopping AI Assistant",
description="AI-powered customer service assistant with LangGraph and MCP",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============ Exception Handlers ============
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Global exception handler"""
logger.error(
"Unhandled exception",
path=request.url.path,
error=str(exc)
)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
# ============ Include Routers ============
app.include_router(webhook_router)
# ============ API Endpoints ============
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "b2b-ai-assistant",
"version": "1.0.0"
}
@app.get("/")
async def root():
"""Root endpoint"""
return {
"name": "B2B Shopping AI Assistant",
"version": "1.0.0",
"status": "running"
}
class QueryRequest(BaseModel):
"""Direct query request model"""
conversation_id: str
user_id: str
account_id: str
message: str
class QueryResponse(BaseModel):
"""Query response model"""
response: str
intent: str | None = None
requires_human: bool = False
context: dict[str, Any] = {}
@app.post("/api/agent/query", response_model=QueryResponse)
async def agent_query(request: QueryRequest):
"""Direct agent query endpoint
Allows direct testing of the agent without Chatwoot integration.
"""
from core.graph import process_message
logger.info(
"Direct query received",
conversation_id=request.conversation_id,
user_id=request.user_id
)
# Load context from cache
cache = get_cache_manager()
context = await cache.get_context(request.conversation_id)
history = await cache.get_messages(request.conversation_id)
# Process message
final_state = await process_message(
conversation_id=request.conversation_id,
user_id=request.user_id,
account_id=request.account_id,
message=request.message,
history=history,
context=context
)
# Update cache
await cache.add_message(request.conversation_id, "user", request.message)
if final_state.get("response"):
await cache.add_message(
request.conversation_id,
"assistant",
final_state["response"]
)
# Save context
new_context = final_state.get("context", {})
new_context["last_intent"] = final_state.get("intent")
await cache.set_context(request.conversation_id, new_context)
return QueryResponse(
response=final_state.get("response", ""),
intent=final_state.get("intent"),
requires_human=final_state.get("requires_human", False),
context=final_state.get("context", {})
)
@app.get("/api/config")
async def get_config():
"""Get sanitized configuration"""
return {
"zhipu_model": settings.zhipu_model,
"max_conversation_steps": settings.max_conversation_steps,
"conversation_timeout": settings.conversation_timeout,
"mcp_servers": {
"strapi": settings.strapi_mcp_url,
"order": settings.order_mcp_url,
"aftersale": settings.aftersale_mcp_url,
"product": settings.product_mcp_url
}
}
# ============ Run Application ============
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True
)