Files

419 lines
14 KiB
Python
Raw Permalink Normal View History

"""
Local Knowledge Base using SQLite
Stores FAQ, company info, and policies locally for fast access.
Syncs with Strapi CMS periodically.
"""
import sqlite3
import json
import asyncio
from datetime import datetime
from typing import List, Dict, Any, Optional
from pathlib import Path
import httpx
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
class KnowledgeBaseSettings(BaseSettings):
"""Knowledge base configuration"""
strapi_api_url: str
strapi_api_token: str = ""
db_path: str = "/data/faq.db"
sync_interval: int = 3600 # Sync every hour
sync_on_startup: bool = True # Run initial sync on startup
sync_interval_minutes: int = 60 # Sync interval in minutes
model_config = ConfigDict(env_file=".env")
settings = KnowledgeBaseSettings()
class LocalKnowledgeBase:
"""Local SQLite knowledge base"""
def __init__(self, db_path: Optional[str] = None):
self.db_path = db_path or settings.db_path
self._conn: Optional[sqlite3.Connection] = None
def _get_conn(self) -> sqlite3.Connection:
"""Get database connection"""
if self._conn is None:
# Ensure directory exists
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(self.db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._init_db()
return self._conn
def _init_db(self):
"""Initialize database schema"""
conn = self._get_conn()
# Create tables
conn.executescript("""
-- FAQ table
CREATE TABLE IF NOT EXISTS faq (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strapi_id TEXT,
category TEXT NOT NULL,
locale TEXT NOT NULL,
question TEXT,
answer TEXT,
description TEXT,
extra_data TEXT,
synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(category, locale, strapi_id)
);
-- Create indexes for FAQ
CREATE INDEX IF NOT EXISTS idx_faq_category ON faq(category);
CREATE INDEX IF NOT EXISTS idx_faq_locale ON faq(locale);
CREATE INDEX IF NOT EXISTS idx_faq_search ON faq(question, answer);
-- Full-text search
CREATE VIRTUAL TABLE IF NOT EXISTS fts_faq USING fts5(
question, answer, category, locale, content='faq'
);
-- Trigger to update FTS
CREATE TRIGGER IF NOT EXISTS fts_faq_insert AFTER INSERT ON faq BEGIN
INSERT INTO fts_faq(rowid, question, answer, category, locale)
VALUES (new.rowid, new.question, new.answer, new.category, new.locale);
END;
CREATE TRIGGER IF NOT EXISTS fts_faq_delete AFTER DELETE ON faq BEGIN
INSERT INTO fts_faq(fts_faq, rowid, question, answer, category, locale)
VALUES ('delete', old.rowid, old.question, old.answer, old.category, old.locale);
END;
CREATE TRIGGER IF NOT EXISTS fts_faq_update AFTER UPDATE ON faq BEGIN
INSERT INTO fts_faq(fts_faq, rowid, question, answer, category, locale)
VALUES ('delete', old.rowid, old.question, old.answer, old.category, old.locale);
INSERT INTO fts_faq(rowid, question, answer, category, locale)
VALUES (new.rowid, new.question, new.answer, new.category, new.locale);
END;
-- Company info table
CREATE TABLE IF NOT EXISTS company_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
section TEXT NOT NULL UNIQUE,
locale TEXT NOT NULL,
title TEXT,
description TEXT,
content TEXT,
extra_data TEXT,
synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(section, locale)
);
CREATE INDEX IF NOT EXISTS idx_company_section ON company_info(section);
CREATE INDEX IF NOT EXISTS idx_company_locale ON company_info(locale);
-- Policy table
CREATE TABLE IF NOT EXISTS policy (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
locale TEXT NOT NULL,
title TEXT,
summary TEXT,
content TEXT,
version TEXT,
effective_date TEXT,
synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(type, locale)
);
CREATE INDEX IF NOT EXISTS idx_policy_type ON policy(type);
CREATE INDEX IF NOT EXISTS idx_policy_locale ON policy(locale);
-- Sync status table
CREATE TABLE IF NOT EXISTS sync_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data_type TEXT NOT NULL,
last_sync_at TIMESTAMP,
status TEXT,
error_message TEXT,
items_count INTEGER
);
""")
# ============ FAQ Operations ============
def query_faq(
self,
category: str,
locale: str,
limit: int = 10
) -> Dict[str, Any]:
"""Query FAQ from local database"""
conn = self._get_conn()
# Query FAQ
cursor = conn.execute(
"""SELECT id, strapi_id, category, locale, question, answer, description, extra_data
FROM faq
WHERE category = ? AND locale = ?
LIMIT ?""",
(category, locale, limit)
)
results = []
for row in cursor.fetchall():
item = {
"id": row["strapi_id"],
"category": row["category"],
"locale": row["locale"],
"question": row["question"],
"answer": row["answer"]
}
if row["description"]:
item["description"] = row["description"]
if row["extra_data"]:
item.update(json.loads(row["extra_data"]))
results.append(item)
return {
"success": True,
"count": len(results),
"category": category,
"locale": locale,
"results": results,
"_source": "local_cache"
}
def search_faq(
self,
query: str,
locale: str = "en",
limit: int = 10
) -> Dict[str, Any]:
"""Full-text search FAQ"""
conn = self._get_conn()
# Use FTS for search
cursor = conn.execute(
"""SELECT fts_faq.question, fts_faq.answer, faq.category, faq.locale
FROM fts_faq
JOIN faq ON fts_faq.rowid = faq.id
WHERE fts_faq MATCH ? AND faq.locale = ?
LIMIT ?""",
(query, locale, limit)
)
results = []
for row in cursor.fetchall():
results.append({
"question": row["question"],
"answer": row["answer"],
"category": row["category"],
"locale": row["locale"]
})
return {
"success": True,
"count": len(results),
"query": query,
"locale": locale,
"results": results,
"_source": "local_cache"
}
def save_faq_batch(self, faq_list: List[Dict[str, Any]], category: str, locale: str):
"""Save batch of FAQ to database"""
conn = self._get_conn()
count = 0
for item in faq_list:
try:
# Extract fields
question = item.get("question") or item.get("title") or item.get("content", "")
answer = item.get("answer") or item.get("content") or ""
description = item.get("description") or ""
strapi_id = item.get("id", "")
# Extra data as JSON
extra_data = json.dumps({
k: v for k, v in item.items()
if k not in ["id", "question", "answer", "title", "content", "description"]
}, ensure_ascii=False)
# Insert or replace
conn.execute(
"""INSERT OR REPLACE INTO faq
(strapi_id, category, locale, question, answer, description, extra_data, synced_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(strapi_id, category, locale, question, answer, description, extra_data, datetime.now().isoformat())
)
count += 1
except Exception as e:
print(f"Error saving FAQ: {e}")
conn.commit()
return count
# ============ Company Info Operations ============
def get_company_info(self, section: str, locale: str = "en") -> Dict[str, Any]:
"""Get company info from local database"""
conn = self._get_conn()
cursor = conn.execute(
"""SELECT section, locale, title, description, content, extra_data
FROM company_info
WHERE section = ? AND locale = ?""",
(section, locale)
)
row = cursor.fetchone()
if not row:
return {
"success": False,
"error": f"Section '{section}' not found",
"data": None,
"_source": "local_cache"
}
result_data = {
"section": row["section"],
"locale": row["locale"],
"title": row["title"],
"description": row["description"],
"content": row["content"]
}
if row["extra_data"]:
result_data.update(json.loads(row["extra_data"]))
return {
"success": True,
"data": result_data,
"_source": "local_cache"
}
def save_company_info(self, section: str, locale: str, data: Dict[str, Any]):
"""Save company info to database"""
conn = self._get_conn()
title = data.get("title") or data.get("section_title") or ""
description = data.get("description") or ""
content = data.get("content") or ""
extra_data = json.dumps({
k: v for k, v in data.items()
if k not in ["section", "title", "description", "content"]
}, ensure_ascii=False)
conn.execute(
"""INSERT OR REPLACE INTO company_info
(section, locale, title, description, content, extra_data, synced_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(section, locale, title, description, content, extra_data, datetime.now().isoformat())
)
conn.commit()
# ============ Policy Operations ============
def get_policy(self, policy_type: str, locale: str = "en") -> Dict[str, Any]:
"""Get policy from local database"""
conn = self._get_conn()
cursor = conn.execute(
"""SELECT type, locale, title, summary, content, version, effective_date
FROM policy
WHERE type = ? AND locale = ?""",
(policy_type, locale)
)
row = cursor.fetchone()
if not row:
return {
"success": False,
"error": f"Policy '{policy_type}' not found",
"data": None,
"_source": "local_cache"
}
return {
"success": True,
"data": {
"type": row["type"],
"locale": row["locale"],
"title": row["title"],
"summary": row["summary"],
"content": row["content"],
"version": row["version"],
"effective_date": row["effective_date"]
},
"_source": "local_cache"
}
def save_policy(self, policy_type: str, locale: str, data: Dict[str, Any]):
"""Save policy to database"""
conn = self._get_conn()
title = data.get("title") or ""
summary = data.get("summary") or ""
content = data.get("content") or ""
version = data.get("version") or ""
effective_date = data.get("effective_date") or ""
conn.execute(
"""INSERT OR REPLACE INTO policy
(type, locale, title, summary, content, version, effective_date, synced_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(policy_type, locale, title, summary, content, version, effective_date, datetime.now().isoformat())
)
conn.commit()
# ============ Sync Status ============
def update_sync_status(self, data_type: str, status: str, items_count: int = 0, error: Optional[str] = None):
"""Update sync status"""
conn = self._get_conn()
conn.execute(
"""INSERT INTO sync_status (data_type, last_sync_at, status, items_count, error_message)
VALUES (?, ?, ?, ?, ?)""",
(data_type, datetime.now().isoformat(), status, items_count, error)
)
conn.commit()
def get_sync_status(self, data_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get sync status"""
conn = self._get_conn()
if data_type:
cursor = conn.execute(
"""SELECT * FROM sync_status WHERE data_type = ? ORDER BY last_sync_at DESC LIMIT 1""",
(data_type,)
)
else:
cursor = conn.execute(
"""SELECT * FROM sync_status ORDER BY last_sync_at DESC LIMIT 10"""
)
return [dict(row) for row in cursor.fetchall()]
def close(self):
"""Close database connection"""
if self._conn:
self._conn.close()
self._conn = None
# Global knowledge base instance
kb = LocalKnowledgeBase()
def get_kb() -> LocalKnowledgeBase:
"""Get global knowledge base instance"""
return kb