""" Local Knowledge Base using SQLite Stores FAQ, company info, and policies locally for fast access. Syncs with Strapi CMS periodically. """ import sqlite3 import json import asyncio from datetime import datetime from typing import List, Dict, Any, Optional from pathlib import Path import httpx from pydantic_settings import BaseSettings from pydantic import ConfigDict class KnowledgeBaseSettings(BaseSettings): """Knowledge base configuration""" strapi_api_url: str strapi_api_token: str = "" db_path: str = "/data/faq.db" sync_interval: int = 3600 # Sync every hour sync_on_startup: bool = True # Run initial sync on startup sync_interval_minutes: int = 60 # Sync interval in minutes model_config = ConfigDict(env_file=".env") settings = KnowledgeBaseSettings() class LocalKnowledgeBase: """Local SQLite knowledge base""" def __init__(self, db_path: Optional[str] = None): self.db_path = db_path or settings.db_path self._conn: Optional[sqlite3.Connection] = None def _get_conn(self) -> sqlite3.Connection: """Get database connection""" if self._conn is None: # Ensure directory exists Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect(self.db_path, check_same_thread=False) self._conn.row_factory = sqlite3.Row self._init_db() return self._conn def _init_db(self): """Initialize database schema""" conn = self._get_conn() # Create tables conn.executescript(""" -- FAQ table CREATE TABLE IF NOT EXISTS faq ( id INTEGER PRIMARY KEY AUTOINCREMENT, strapi_id TEXT, category TEXT NOT NULL, locale TEXT NOT NULL, question TEXT, answer TEXT, description TEXT, extra_data TEXT, synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(category, locale, strapi_id) ); -- Create indexes for FAQ CREATE INDEX IF NOT EXISTS idx_faq_category ON faq(category); CREATE INDEX IF NOT EXISTS idx_faq_locale ON faq(locale); CREATE INDEX IF NOT EXISTS idx_faq_search ON faq(question, answer); -- Full-text search CREATE VIRTUAL TABLE IF NOT EXISTS fts_faq USING fts5( question, answer, category, locale, content='faq' ); -- Trigger to update FTS CREATE TRIGGER IF NOT EXISTS fts_faq_insert AFTER INSERT ON faq BEGIN INSERT INTO fts_faq(rowid, question, answer, category, locale) VALUES (new.rowid, new.question, new.answer, new.category, new.locale); END; CREATE TRIGGER IF NOT EXISTS fts_faq_delete AFTER DELETE ON faq BEGIN INSERT INTO fts_faq(fts_faq, rowid, question, answer, category, locale) VALUES ('delete', old.rowid, old.question, old.answer, old.category, old.locale); END; CREATE TRIGGER IF NOT EXISTS fts_faq_update AFTER UPDATE ON faq BEGIN INSERT INTO fts_faq(fts_faq, rowid, question, answer, category, locale) VALUES ('delete', old.rowid, old.question, old.answer, old.category, old.locale); INSERT INTO fts_faq(rowid, question, answer, category, locale) VALUES (new.rowid, new.question, new.answer, new.category, new.locale); END; -- Company info table CREATE TABLE IF NOT EXISTS company_info ( id INTEGER PRIMARY KEY AUTOINCREMENT, section TEXT NOT NULL UNIQUE, locale TEXT NOT NULL, title TEXT, description TEXT, content TEXT, extra_data TEXT, synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(section, locale) ); CREATE INDEX IF NOT EXISTS idx_company_section ON company_info(section); CREATE INDEX IF NOT EXISTS idx_company_locale ON company_info(locale); -- Policy table CREATE TABLE IF NOT EXISTS policy ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL, locale TEXT NOT NULL, title TEXT, summary TEXT, content TEXT, version TEXT, effective_date TEXT, synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(type, locale) ); CREATE INDEX IF NOT EXISTS idx_policy_type ON policy(type); CREATE INDEX IF NOT EXISTS idx_policy_locale ON policy(locale); -- Sync status table CREATE TABLE IF NOT EXISTS sync_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, data_type TEXT NOT NULL, last_sync_at TIMESTAMP, status TEXT, error_message TEXT, items_count INTEGER ); """) # ============ FAQ Operations ============ def query_faq( self, category: str, locale: str, limit: int = 10 ) -> Dict[str, Any]: """Query FAQ from local database""" conn = self._get_conn() # Query FAQ cursor = conn.execute( """SELECT id, strapi_id, category, locale, question, answer, description, extra_data FROM faq WHERE category = ? AND locale = ? LIMIT ?""", (category, locale, limit) ) results = [] for row in cursor.fetchall(): item = { "id": row["strapi_id"], "category": row["category"], "locale": row["locale"], "question": row["question"], "answer": row["answer"] } if row["description"]: item["description"] = row["description"] if row["extra_data"]: item.update(json.loads(row["extra_data"])) results.append(item) return { "success": True, "count": len(results), "category": category, "locale": locale, "results": results, "_source": "local_cache" } def search_faq( self, query: str, locale: str = "en", limit: int = 10 ) -> Dict[str, Any]: """Full-text search FAQ""" conn = self._get_conn() # Use FTS for search cursor = conn.execute( """SELECT fts_faq.question, fts_faq.answer, faq.category, faq.locale FROM fts_faq JOIN faq ON fts_faq.rowid = faq.id WHERE fts_faq MATCH ? AND faq.locale = ? LIMIT ?""", (query, locale, limit) ) results = [] for row in cursor.fetchall(): results.append({ "question": row["question"], "answer": row["answer"], "category": row["category"], "locale": row["locale"] }) return { "success": True, "count": len(results), "query": query, "locale": locale, "results": results, "_source": "local_cache" } def save_faq_batch(self, faq_list: List[Dict[str, Any]], category: str, locale: str): """Save batch of FAQ to database""" conn = self._get_conn() count = 0 for item in faq_list: try: # Extract fields question = item.get("question") or item.get("title") or item.get("content", "") answer = item.get("answer") or item.get("content") or "" description = item.get("description") or "" strapi_id = item.get("id", "") # Extra data as JSON extra_data = json.dumps({ k: v for k, v in item.items() if k not in ["id", "question", "answer", "title", "content", "description"] }, ensure_ascii=False) # Insert or replace conn.execute( """INSERT OR REPLACE INTO faq (strapi_id, category, locale, question, answer, description, extra_data, synced_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (strapi_id, category, locale, question, answer, description, extra_data, datetime.now().isoformat()) ) count += 1 except Exception as e: print(f"Error saving FAQ: {e}") conn.commit() return count # ============ Company Info Operations ============ def get_company_info(self, section: str, locale: str = "en") -> Dict[str, Any]: """Get company info from local database""" conn = self._get_conn() cursor = conn.execute( """SELECT section, locale, title, description, content, extra_data FROM company_info WHERE section = ? AND locale = ?""", (section, locale) ) row = cursor.fetchone() if not row: return { "success": False, "error": f"Section '{section}' not found", "data": None, "_source": "local_cache" } result_data = { "section": row["section"], "locale": row["locale"], "title": row["title"], "description": row["description"], "content": row["content"] } if row["extra_data"]: result_data.update(json.loads(row["extra_data"])) return { "success": True, "data": result_data, "_source": "local_cache" } def save_company_info(self, section: str, locale: str, data: Dict[str, Any]): """Save company info to database""" conn = self._get_conn() title = data.get("title") or data.get("section_title") or "" description = data.get("description") or "" content = data.get("content") or "" extra_data = json.dumps({ k: v for k, v in data.items() if k not in ["section", "title", "description", "content"] }, ensure_ascii=False) conn.execute( """INSERT OR REPLACE INTO company_info (section, locale, title, description, content, extra_data, synced_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", (section, locale, title, description, content, extra_data, datetime.now().isoformat()) ) conn.commit() # ============ Policy Operations ============ def get_policy(self, policy_type: str, locale: str = "en") -> Dict[str, Any]: """Get policy from local database""" conn = self._get_conn() cursor = conn.execute( """SELECT type, locale, title, summary, content, version, effective_date FROM policy WHERE type = ? AND locale = ?""", (policy_type, locale) ) row = cursor.fetchone() if not row: return { "success": False, "error": f"Policy '{policy_type}' not found", "data": None, "_source": "local_cache" } return { "success": True, "data": { "type": row["type"], "locale": row["locale"], "title": row["title"], "summary": row["summary"], "content": row["content"], "version": row["version"], "effective_date": row["effective_date"] }, "_source": "local_cache" } def save_policy(self, policy_type: str, locale: str, data: Dict[str, Any]): """Save policy to database""" conn = self._get_conn() title = data.get("title") or "" summary = data.get("summary") or "" content = data.get("content") or "" version = data.get("version") or "" effective_date = data.get("effective_date") or "" conn.execute( """INSERT OR REPLACE INTO policy (type, locale, title, summary, content, version, effective_date, synced_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (policy_type, locale, title, summary, content, version, effective_date, datetime.now().isoformat()) ) conn.commit() # ============ Sync Status ============ def update_sync_status(self, data_type: str, status: str, items_count: int = 0, error: Optional[str] = None): """Update sync status""" conn = self._get_conn() conn.execute( """INSERT INTO sync_status (data_type, last_sync_at, status, items_count, error_message) VALUES (?, ?, ?, ?, ?)""", (data_type, datetime.now().isoformat(), status, items_count, error) ) conn.commit() def get_sync_status(self, data_type: Optional[str] = None) -> List[Dict[str, Any]]: """Get sync status""" conn = self._get_conn() if data_type: cursor = conn.execute( """SELECT * FROM sync_status WHERE data_type = ? ORDER BY last_sync_at DESC LIMIT 1""", (data_type,) ) else: cursor = conn.execute( """SELECT * FROM sync_status ORDER BY last_sync_at DESC LIMIT 10""" ) return [dict(row) for row in cursor.fetchall()] def close(self): """Close database connection""" if self._conn: self._conn.close() self._conn = None # Global knowledge base instance kb = LocalKnowledgeBase() def get_kb() -> LocalKnowledgeBase: """Get global knowledge base instance""" return kb