a digital person for bluesky
at main 11 kB view raw
1#!/usr/bin/env python3 2"""SQLite database for robust notification tracking.""" 3 4import sqlite3 5import json 6from pathlib import Path 7from datetime import datetime, timedelta 8from typing import Set, Dict, List, Optional, Tuple 9import logging 10 11logger = logging.getLogger(__name__) 12 13class NotificationDB: 14 """Database for tracking notification processing state.""" 15 16 def __init__(self, db_path: str = "queue/notifications.db"): 17 """Initialize the notification database.""" 18 self.db_path = Path(db_path) 19 self.db_path.parent.mkdir(exist_ok=True, parents=True) 20 self.conn = None 21 self._init_db() 22 23 def _init_db(self): 24 """Initialize database schema.""" 25 self.conn = sqlite3.connect(self.db_path, check_same_thread=False) 26 self.conn.row_factory = sqlite3.Row 27 28 # Create main notifications table 29 self.conn.execute(""" 30 CREATE TABLE IF NOT EXISTS notifications ( 31 uri TEXT PRIMARY KEY, 32 indexed_at TEXT NOT NULL, 33 processed_at TEXT, 34 status TEXT NOT NULL DEFAULT 'pending', 35 reason TEXT, 36 author_handle TEXT, 37 author_did TEXT, 38 text TEXT, 39 parent_uri TEXT, 40 root_uri TEXT, 41 error TEXT, 42 metadata TEXT 43 ) 44 """) 45 46 # Create indexes for faster lookups 47 self.conn.execute(""" 48 CREATE INDEX IF NOT EXISTS idx_indexed_at 49 ON notifications(indexed_at DESC) 50 """) 51 52 self.conn.execute(""" 53 CREATE INDEX IF NOT EXISTS idx_status 54 ON notifications(status) 55 """) 56 57 self.conn.execute(""" 58 CREATE INDEX IF NOT EXISTS idx_author_handle 59 ON notifications(author_handle) 60 """) 61 62 # Create session tracking table 63 self.conn.execute(""" 64 CREATE TABLE IF NOT EXISTS sessions ( 65 id INTEGER PRIMARY KEY AUTOINCREMENT, 66 started_at TEXT NOT NULL, 67 ended_at TEXT, 68 last_seen_at TEXT, 69 notifications_processed INTEGER DEFAULT 0, 70 notifications_skipped INTEGER DEFAULT 0, 71 notifications_error INTEGER DEFAULT 0 72 ) 73 """) 74 75 self.conn.commit() 76 77 def add_notification(self, notif_dict: Dict) -> bool: 78 """Add a notification to the database.""" 79 try: 80 # Handle None input 81 if not notif_dict: 82 return False 83 84 # Extract key fields 85 uri = notif_dict.get('uri', '') 86 if not uri: 87 return False 88 89 indexed_at = notif_dict.get('indexed_at', '') 90 reason = notif_dict.get('reason', '') 91 author = notif_dict.get('author', {}) if notif_dict.get('author') else {} 92 author_handle = author.get('handle', '') if author else '' 93 author_did = author.get('did', '') if author else '' 94 95 # Extract text from record if available (handle None records) 96 record = notif_dict.get('record') or {} 97 text = record.get('text', '')[:500] if record else '' 98 99 # Extract thread info 100 parent_uri = None 101 root_uri = None 102 if record and 'reply' in record and record['reply']: 103 reply_info = record['reply'] 104 if reply_info and isinstance(reply_info, dict): 105 parent_info = reply_info.get('parent', {}) 106 root_info = reply_info.get('root', {}) 107 if parent_info: 108 parent_uri = parent_info.get('uri') 109 if root_info: 110 root_uri = root_info.get('uri') 111 112 # Store additional metadata as JSON 113 metadata = { 114 'cid': notif_dict.get('cid'), 115 'labels': notif_dict.get('labels', []), 116 'is_read': notif_dict.get('is_read', False) 117 } 118 119 self.conn.execute(""" 120 INSERT OR IGNORE INTO notifications 121 (uri, indexed_at, reason, author_handle, author_did, text, 122 parent_uri, root_uri, status, metadata) 123 VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?) 124 """, (uri, indexed_at, reason, author_handle, author_did, text, 125 parent_uri, root_uri, json.dumps(metadata))) 126 127 self.conn.commit() 128 return True 129 130 except Exception as e: 131 logger.error(f"Error adding notification to DB: {e}") 132 return False 133 134 def is_processed(self, uri: str) -> bool: 135 """Check if a notification has been processed.""" 136 cursor = self.conn.execute(""" 137 SELECT status FROM notifications WHERE uri = ? 138 """, (uri,)) 139 row = cursor.fetchone() 140 141 if row: 142 return row['status'] in ['processed', 'ignored', 'no_reply'] 143 return False 144 145 def mark_processed(self, uri: str, status: str = 'processed', error: str = None): 146 """Mark a notification as processed.""" 147 try: 148 self.conn.execute(""" 149 UPDATE notifications 150 SET status = ?, processed_at = ?, error = ? 151 WHERE uri = ? 152 """, (status, datetime.now().isoformat(), error, uri)) 153 self.conn.commit() 154 except Exception as e: 155 logger.error(f"Error marking notification processed: {e}") 156 157 def get_unprocessed(self, limit: int = 100) -> List[Dict]: 158 """Get unprocessed notifications.""" 159 cursor = self.conn.execute(""" 160 SELECT * FROM notifications 161 WHERE status = 'pending' 162 ORDER BY indexed_at ASC 163 LIMIT ? 164 """, (limit,)) 165 166 return [dict(row) for row in cursor] 167 168 def get_latest_processed_time(self) -> Optional[str]: 169 """Get the timestamp of the most recently processed notification.""" 170 cursor = self.conn.execute(""" 171 SELECT MAX(indexed_at) as latest 172 FROM notifications 173 WHERE status IN ('processed', 'ignored', 'no_reply') 174 """) 175 row = cursor.fetchone() 176 return row['latest'] if row and row['latest'] else None 177 178 def cleanup_old_records(self, days: int = 7): 179 """Remove records older than specified days.""" 180 cutoff_date = (datetime.now() - timedelta(days=days)).isoformat() 181 182 deleted = self.conn.execute(""" 183 DELETE FROM notifications 184 WHERE indexed_at < ? 185 AND status IN ('processed', 'ignored', 'no_reply', 'error') 186 """, (cutoff_date,)).rowcount 187 188 self.conn.commit() 189 190 if deleted > 0: 191 logger.info(f"Cleaned up {deleted} old notification records") 192 # Vacuum to reclaim space 193 self.conn.execute("VACUUM") 194 195 def get_stats(self) -> Dict: 196 """Get database statistics.""" 197 stats = {} 198 199 # Count by status 200 cursor = self.conn.execute(""" 201 SELECT status, COUNT(*) as count 202 FROM notifications 203 GROUP BY status 204 """) 205 206 for row in cursor: 207 stats[f"status_{row['status']}"] = row['count'] 208 209 # Total count 210 cursor = self.conn.execute("SELECT COUNT(*) as total FROM notifications") 211 stats['total'] = cursor.fetchone()['total'] 212 213 # Recent activity (last 24h) 214 yesterday = (datetime.now() - timedelta(days=1)).isoformat() 215 cursor = self.conn.execute(""" 216 SELECT COUNT(*) as recent 217 FROM notifications 218 WHERE indexed_at > ? 219 """, (yesterday,)) 220 stats['recent_24h'] = cursor.fetchone()['recent'] 221 222 return stats 223 224 def start_session(self) -> int: 225 """Start a new processing session.""" 226 cursor = self.conn.execute(""" 227 INSERT INTO sessions (started_at, last_seen_at) 228 VALUES (?, ?) 229 """, (datetime.now().isoformat(), datetime.now().isoformat())) 230 self.conn.commit() 231 return cursor.lastrowid 232 233 def update_session(self, session_id: int, processed: int = 0, skipped: int = 0, error: int = 0): 234 """Update session statistics.""" 235 self.conn.execute(""" 236 UPDATE sessions 237 SET last_seen_at = ?, 238 notifications_processed = notifications_processed + ?, 239 notifications_skipped = notifications_skipped + ?, 240 notifications_error = notifications_error + ? 241 WHERE id = ? 242 """, (datetime.now().isoformat(), processed, skipped, error, session_id)) 243 self.conn.commit() 244 245 def end_session(self, session_id: int): 246 """End a processing session.""" 247 self.conn.execute(""" 248 UPDATE sessions 249 SET ended_at = ? 250 WHERE id = ? 251 """, (datetime.now().isoformat(), session_id)) 252 self.conn.commit() 253 254 def get_processed_uris(self, limit: int = 10000) -> Set[str]: 255 """Get set of processed URIs for compatibility with existing code.""" 256 cursor = self.conn.execute(""" 257 SELECT uri FROM notifications 258 WHERE status IN ('processed', 'ignored', 'no_reply') 259 ORDER BY processed_at DESC 260 LIMIT ? 261 """, (limit,)) 262 263 return {row['uri'] for row in cursor} 264 265 def migrate_from_json(self, json_path: str = "queue/processed_notifications.json"): 266 """Migrate data from the old JSON format.""" 267 json_file = Path(json_path) 268 if not json_file.exists(): 269 return 270 271 try: 272 with open(json_file, 'r') as f: 273 uris = json.load(f) 274 275 migrated = 0 276 for uri in uris: 277 # Add as processed with unknown timestamp 278 self.conn.execute(""" 279 INSERT OR IGNORE INTO notifications 280 (uri, indexed_at, status, processed_at) 281 VALUES (?, ?, 'processed', ?) 282 """, (uri, datetime.now().isoformat(), datetime.now().isoformat())) 283 migrated += 1 284 285 self.conn.commit() 286 logger.info(f"Migrated {migrated} URIs from JSON to database") 287 288 # Rename old file to backup 289 backup_path = json_file.with_suffix('.json.backup') 290 json_file.rename(backup_path) 291 logger.info(f"Renamed old JSON file to {backup_path}") 292 293 except Exception as e: 294 logger.error(f"Error migrating from JSON: {e}") 295 296 def close(self): 297 """Close database connection.""" 298 if self.conn: 299 self.conn.close()