""" Strapi to Local Knowledge Base Sync Script Periodically syncs FAQ, company info, and policies from Strapi CMS to local SQLite database. """ import asyncio import httpx from datetime import datetime from typing import Dict, Any, List from knowledge_base import LocalKnowledgeBase, settings from config_loader import load_config, get_category_endpoint class StrapiSyncer: """Sync data from Strapi to local knowledge base""" def __init__(self, kb: LocalKnowledgeBase): self.kb = kb self.api_url = settings.strapi_api_url.rstrip("/") self.api_token = settings.strapi_api_token async def sync_all(self) -> Dict[str, Any]: """Sync all data from Strapi""" results = { "success": True, "timestamp": datetime.now().isoformat(), "details": {} } try: # Load config try: config = load_config() except: config = None # Sync FAQ categories categories = ["register", "order", "pre-order", "payment", "shipment", "return", "other"] if config: categories = list(config.faq_categories.keys()) faq_total = 0 for category in categories: count = await self.sync_faq_category(category, config) faq_total += count results["details"][f"faq_{category}"] = count results["details"]["faq_total"] = faq_total # Sync company info company_sections = ["contact", "about", "service"] for section in company_sections: await self.sync_company_info(section) results["details"]["company_info"] = len(company_sections) # Sync policies policy_types = ["return_policy", "privacy_policy", "terms_of_service", "shipping_policy", "payment_policy"] for policy_type in policy_types: await self.sync_policy(policy_type) results["details"]["policies"] = len(policy_types) # Update sync status self.kb.update_sync_status("all", "success", faq_total) print(f"✅ Sync completed: {faq_total} FAQs, {len(company_sections)} company sections, {len(policy_types)} policies") except Exception as e: results["success"] = False results["error"] = str(e) self.kb.update_sync_status("all", "error", 0, str(e)) print(f"❌ Sync failed: {e}") return results async def sync_faq_category(self, category: str, config=None) -> int: """Sync FAQ category from Strapi""" try: # Get endpoint from config if config: endpoint = get_category_endpoint(category, config) else: endpoint = f"faq-{category}" headers = {"Content-Type": "application/json"} if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" # Fetch from Strapi async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{self.api_url}/api/{endpoint}", params={"populate": "deep"}, headers=headers ) response.raise_for_status() data = response.json() # Extract FAQ items faq_list = [] item_data = data.get("data", {}) if isinstance(item_data, dict): if item_data.get("content"): faq_list = item_data["content"] elif item_data.get("faqs"): faq_list = item_data["faqs"] elif item_data.get("questions"): faq_list = item_data["questions"] elif isinstance(item_data, list): faq_list = item_data # Save to local database count = self.kb.save_faq_batch(faq_list, category, "en") # Also sync other locales if available locales = ["nl", "de", "es", "fr", "it", "tr"] for locale in locales: try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{self.api_url}/api/{endpoint}", params={"populate": "deep", "locale": locale}, headers=headers ) response.raise_for_status() data = response.json() # Extract and save faq_list_locale = [] item_data_locale = data.get("data", {}) if isinstance(item_data_locale, dict): if item_data_locale.get("content"): faq_list_locale = item_data_locale["content"] elif item_data_locale.get("faqs"): faq_list_locale = item_data_locale["faqs"] if faq_list_locale: self.kb.save_faq_batch(faq_list_locale, category, locale) count += len(faq_list_locale) except Exception as e: print(f"Warning: Failed to sync {category} for locale {locale}: {e}") print(f" ✓ Synced {count} FAQs for category '{category}'") return count except Exception as e: print(f" ✗ Failed to sync category '{category}': {e}") return 0 async def sync_company_info(self, section: str): """Sync company info from Strapi""" try: section_map = { "contact": "info-contact", "about": "info-about", "service": "info-service", } endpoint = section_map.get(section, f"info-{section}") headers = {"Content-Type": "application/json"} if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{self.api_url}/api/{endpoint}", params={"populate": "deep"}, headers=headers ) response.raise_for_status() data = response.json() item = data.get("data", {}) if item: # Extract data company_data = { "section": section, "title": item.get("title"), "description": item.get("description"), "content": item.get("content") } # Handle profile info if item.get("yehwang_profile"): profile = item["yehwang_profile"] company_data["profile"] = { "title": profile.get("title"), "content": profile.get("content") } self.kb.save_company_info(section, "en", company_data) print(f" ✓ Synced company info '{section}'") except Exception as e: print(f" ✗ Failed to sync company info '{section}': {e}") async def sync_policy(self, policy_type: str): """Sync policy from Strapi""" try: policy_map = { "return_policy": "policy-return", "privacy_policy": "policy-privacy", "terms_of_service": "policy-terms", "shipping_policy": "policy-shipping", "payment_policy": "policy-payment", } endpoint = policy_map.get(policy_type, f"policy-{policy_type}") headers = {"Content-Type": "application/json"} if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{self.api_url}/api/{endpoint}", params={"populate": "deep"}, headers=headers ) response.raise_for_status() data = response.json() item = data.get("data", {}) if item: policy_data = { "title": item.get("title"), "summary": item.get("summary"), "content": item.get("content"), "version": item.get("version"), "effective_date": item.get("effective_date") } self.kb.save_policy(policy_type, "en", policy_data) print(f" ✓ Synced policy '{policy_type}'") except Exception as e: print(f" ✗ Failed to sync policy '{policy_type}': {e}") async def run_sync(kb: LocalKnowledgeBase): """Run sync process""" syncer = StrapiSyncer(kb) await syncer.sync_all() if __name__ == "__main__": # Run sync kb_instance = LocalKnowledgeBase() asyncio.run(run_sync(kb_instance))