""" Chatwoot API Client for B2B Shopping AI Assistant """ from typing import Any, Optional from dataclasses import dataclass from enum import Enum import httpx from config import settings from utils.logger import get_logger logger = get_logger(__name__) class MessageType(str, Enum): """Chatwoot message types""" INCOMING = "incoming" OUTGOING = "outgoing" class ConversationStatus(str, Enum): """Chatwoot conversation status""" OPEN = "open" RESOLVED = "resolved" PENDING = "pending" SNOOZED = "snoozed" @dataclass class ChatwootMessage: """Chatwoot message structure""" id: int content: str message_type: str conversation_id: int sender_type: Optional[str] = None sender_id: Optional[int] = None private: bool = False @dataclass class ChatwootContact: """Chatwoot contact structure""" id: int name: Optional[str] = None email: Optional[str] = None phone_number: Optional[str] = None custom_attributes: Optional[dict[str, Any]] = None class ChatwootClient: """Chatwoot API Client""" def __init__( self, api_url: Optional[str] = None, api_token: Optional[str] = None, account_id: int = 2 ): """Initialize Chatwoot client Args: api_url: Chatwoot API URL, defaults to settings api_token: API access token, defaults to settings account_id: Chatwoot account ID """ self.api_url = (api_url or settings.chatwoot_api_url).rstrip("/") self.api_token = api_token or settings.chatwoot_api_token self.account_id = account_id self._client: Optional[httpx.AsyncClient] = None logger.info("Chatwoot client initialized", api_url=self.api_url) async def _get_client(self) -> httpx.AsyncClient: """Get or create HTTP client""" if self._client is None: self._client = httpx.AsyncClient( base_url=f"{self.api_url}/api/v1/accounts/{self.account_id}", headers={ "api_access_token": self.api_token, "Content-Type": "application/json" }, timeout=30.0 ) return self._client async def close(self) -> None: """Close HTTP client""" if self._client: await self._client.aclose() self._client = None # ============ Messages ============ async def send_message( self, conversation_id: int, content: str, message_type: MessageType = MessageType.OUTGOING, private: bool = False ) -> dict[str, Any]: """Send a message to a conversation Args: conversation_id: Conversation ID content: Message content message_type: Message type (incoming/outgoing) private: Whether message is private (internal note) Returns: Created message data """ client = await self._get_client() payload = { "content": content, "message_type": message_type.value, "private": private } response = await client.post( f"/conversations/{conversation_id}/messages", json=payload ) response.raise_for_status() data = response.json() logger.debug( "Message sent", conversation_id=conversation_id, message_id=data.get("id") ) return data async def send_rich_message( self, conversation_id: int, content: str, content_type: str, content_attributes: dict[str, Any] ) -> dict[str, Any]: """Send a rich message (cards, buttons, etc.) Args: conversation_id: Conversation ID content: Fallback text content content_type: Rich content type (cards, input_select, etc.) content_attributes: Rich content attributes Returns: Created message data """ client = await self._get_client() payload = { "content": content, "message_type": MessageType.OUTGOING.value, "content_type": content_type, "content_attributes": content_attributes } response = await client.post( f"/conversations/{conversation_id}/messages", json=payload ) response.raise_for_status() return response.json() # ============ Conversations ============ async def get_conversation(self, conversation_id: int) -> dict[str, Any]: """Get conversation details Args: conversation_id: Conversation ID Returns: Conversation data """ client = await self._get_client() response = await client.get(f"/conversations/{conversation_id}") response.raise_for_status() return response.json() async def update_conversation_status( self, conversation_id: int, status: ConversationStatus ) -> dict[str, Any]: """Update conversation status Args: conversation_id: Conversation ID status: New status Returns: Updated conversation data """ client = await self._get_client() response = await client.post( f"/conversations/{conversation_id}/toggle_status", json={"status": status.value} ) response.raise_for_status() logger.info( "Conversation status updated", conversation_id=conversation_id, status=status.value ) return response.json() async def add_labels( self, conversation_id: int, labels: list[str] ) -> dict[str, Any]: """Add labels to a conversation Args: conversation_id: Conversation ID labels: List of label names Returns: Updated labels """ client = await self._get_client() response = await client.post( f"/conversations/{conversation_id}/labels", json={"labels": labels} ) response.raise_for_status() return response.json() async def assign_agent( self, conversation_id: int, agent_id: int ) -> dict[str, Any]: """Assign an agent to a conversation Args: conversation_id: Conversation ID agent_id: Agent user ID Returns: Assignment result """ client = await self._get_client() response = await client.post( f"/conversations/{conversation_id}/assignments", json={"assignee_id": agent_id} ) response.raise_for_status() logger.info( "Agent assigned", conversation_id=conversation_id, agent_id=agent_id ) return response.json() # ============ Contacts ============ async def get_contact(self, contact_id: int) -> dict[str, Any]: """Get contact details Args: contact_id: Contact ID Returns: Contact data """ client = await self._get_client() response = await client.get(f"/contacts/{contact_id}") response.raise_for_status() return response.json() async def update_contact( self, contact_id: int, attributes: dict[str, Any] ) -> dict[str, Any]: """Update contact attributes Args: contact_id: Contact ID attributes: Attributes to update Returns: Updated contact data """ client = await self._get_client() response = await client.put( f"/contacts/{contact_id}", json=attributes ) response.raise_for_status() return response.json() # ============ Messages History ============ async def get_messages( self, conversation_id: int, before: Optional[int] = None ) -> list[dict[str, Any]]: """Get conversation messages Args: conversation_id: Conversation ID before: Get messages before this message ID Returns: List of messages """ client = await self._get_client() params = {} if before: params["before"] = before response = await client.get( f"/conversations/{conversation_id}/messages", params=params ) response.raise_for_status() data = response.json() return data.get("payload", []) # Global Chatwoot client instance chatwoot_client: Optional[ChatwootClient] = None def get_chatwoot_client() -> ChatwootClient: """Get or create global Chatwoot client instance""" global chatwoot_client if chatwoot_client is None: chatwoot_client = ChatwootClient() return chatwoot_client