a digital person for bluesky

Implement robust notification tracking system

Major improvements:
- Added SQLite database for reliable notification tracking
- Removed problematic is_read filter that was blocking all notifications
- Implemented timestamp-based filtering for continuity between runs
- Added comprehensive debug logging to understand notification flow
- Created recovery tools for missed notifications
- Added queue health monitoring and alerts
- Auto-migration from old JSON format to database
- Periodic cleanup of old notification records

This should significantly improve notification reliability and provide better visibility into what's being processed.

+146 -43
bsky.py
··· 21 21 import bsky_utils 22 22 from tools.blocks import attach_user_blocks, detach_user_blocks 23 23 from datetime import date 24 + from notification_db import NotificationDB 24 25 25 26 def extract_handles_from_data(data): 26 27 """Recursively extract all unique handles from nested data structure.""" ··· 111 112 112 113 # Synthesis message tracking 113 114 last_synthesis_time = time.time() 115 + 116 + # Database for notification tracking 117 + NOTIFICATION_DB = None 114 118 115 119 def export_agent_state(client, agent, skip_git=False): 116 120 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" ··· 723 727 logger.info(f"Deleted queue file: {queue_filepath.name}") 724 728 725 729 # Also mark as processed to avoid reprocessing 726 - processed_uris = load_processed_notifications() 727 - processed_uris.add(notification_data.get('uri', '')) 728 - save_processed_notifications(processed_uris) 730 + if NOTIFICATION_DB: 731 + NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed') 732 + else: 733 + processed_uris = load_processed_notifications() 734 + processed_uris.add(notification_data.get('uri', '')) 735 + save_processed_notifications(processed_uris) 729 736 730 737 # Export agent state before terminating 731 738 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) ··· 932 939 933 940 934 941 def load_processed_notifications(): 935 - """Load the set of processed notification URIs.""" 936 - if PROCESSED_NOTIFICATIONS_FILE.exists(): 937 - try: 938 - with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 939 - data = json.load(f) 940 - # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 941 - if len(data) > MAX_PROCESSED_NOTIFICATIONS: 942 - data = data[-MAX_PROCESSED_NOTIFICATIONS:] 943 - save_processed_notifications(data) 944 - return set(data) 945 - except Exception as e: 946 - logger.error(f"Error loading processed notifications: {e}") 942 + """Load the set of processed notification URIs from database.""" 943 + global NOTIFICATION_DB 944 + if NOTIFICATION_DB: 945 + return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS) 947 946 return set() 948 947 949 948 950 949 def save_processed_notifications(processed_set): 951 - """Save the set of processed notification URIs.""" 952 - try: 953 - with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 954 - json.dump(list(processed_set), f) 955 - except Exception as e: 956 - logger.error(f"Error saving processed notifications: {e}") 950 + """Save the set of processed notification URIs to database.""" 951 + # This is now handled by marking individual notifications in the DB 952 + # Keeping function for compatibility but it doesn't need to do anything 953 + pass 957 954 958 955 959 956 def save_notification_to_queue(notification, is_priority=None): 960 957 """Save a notification to the queue directory with priority-based filename.""" 961 958 try: 962 - # Check if already processed 963 - processed_uris = load_processed_notifications() 959 + global NOTIFICATION_DB 964 960 965 961 # Handle both notification objects and dicts 966 962 if isinstance(notification, dict): ··· 969 965 else: 970 966 notif_dict = notification_to_dict(notification) 971 967 notification_uri = notification.uri 972 - 973 - if notification_uri in processed_uris: 974 - logger.debug(f"Notification already processed: {notification_uri}") 975 - return False 968 + 969 + # Check if already processed (using database if available) 970 + if NOTIFICATION_DB: 971 + if NOTIFICATION_DB.is_processed(notification_uri): 972 + logger.debug(f"Notification already processed (DB): {notification_uri}") 973 + return False 974 + # Add to database 975 + NOTIFICATION_DB.add_notification(notif_dict) 976 + else: 977 + # Fall back to old JSON method 978 + processed_uris = load_processed_notifications() 979 + if notification_uri in processed_uris: 980 + logger.debug(f"Notification already processed: {notification_uri}") 981 + return False 976 982 977 983 # Create JSON string 978 984 notif_json = json.dumps(notif_dict, sort_keys=True) ··· 1146 1152 logger.info(f"Successfully processed and removed: {filepath.name}") 1147 1153 1148 1154 # Mark as processed to avoid reprocessing 1149 - processed_uris = load_processed_notifications() 1150 - processed_uris.add(notif_data['uri']) 1151 - save_processed_notifications(processed_uris) 1155 + if NOTIFICATION_DB: 1156 + NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed') 1157 + else: 1158 + processed_uris = load_processed_notifications() 1159 + processed_uris.add(notif_data['uri']) 1160 + save_processed_notifications(processed_uris) 1152 1161 1153 1162 elif success is None: # Special case for moving to error directory 1154 1163 error_path = QUEUE_ERROR_DIR / filepath.name ··· 1156 1165 logger.warning(f"Moved {filepath.name} to errors directory") 1157 1166 1158 1167 # Also mark as processed to avoid retrying 1159 - processed_uris = load_processed_notifications() 1160 - processed_uris.add(notif_data['uri']) 1161 - save_processed_notifications(processed_uris) 1168 + if NOTIFICATION_DB: 1169 + NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1170 + else: 1171 + processed_uris = load_processed_notifications() 1172 + processed_uris.add(notif_data['uri']) 1173 + save_processed_notifications(processed_uris) 1162 1174 1163 1175 elif success == "no_reply": # Special case for moving to no_reply directory 1164 1176 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name ··· 1166 1178 logger.info(f"Moved {filepath.name} to no_reply directory") 1167 1179 1168 1180 # Also mark as processed to avoid retrying 1169 - processed_uris = load_processed_notifications() 1170 - processed_uris.add(notif_data['uri']) 1171 - save_processed_notifications(processed_uris) 1181 + if NOTIFICATION_DB: 1182 + NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1183 + else: 1184 + processed_uris = load_processed_notifications() 1185 + processed_uris.add(notif_data['uri']) 1186 + save_processed_notifications(processed_uris) 1172 1187 1173 1188 elif success == "ignored": # Special case for explicitly ignored notifications 1174 1189 # For ignored notifications, we just delete them (not move to no_reply) ··· 1176 1191 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1177 1192 1178 1193 # Also mark as processed to avoid retrying 1179 - processed_uris = load_processed_notifications() 1180 - processed_uris.add(notif_data['uri']) 1181 - save_processed_notifications(processed_uris) 1194 + if NOTIFICATION_DB: 1195 + NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1196 + else: 1197 + processed_uris = load_processed_notifications() 1198 + processed_uris.add(notif_data['uri']) 1199 + save_processed_notifications(processed_uris) 1182 1200 1183 1201 else: 1184 1202 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") ··· 1194 1212 def fetch_and_queue_new_notifications(atproto_client): 1195 1213 """Fetch new notifications and queue them without processing.""" 1196 1214 try: 1215 + global NOTIFICATION_DB 1216 + 1197 1217 # Get current time for marking notifications as seen 1198 1218 logger.debug("Getting current time for notification marking...") 1199 1219 last_seen_at = atproto_client.get_current_time_iso() 1220 + 1221 + # Get timestamp of last processed notification for filtering 1222 + last_processed_time = None 1223 + if NOTIFICATION_DB: 1224 + last_processed_time = NOTIFICATION_DB.get_latest_processed_time() 1225 + if last_processed_time: 1226 + logger.debug(f"Last processed notification was at: {last_processed_time}") 1200 1227 1201 1228 # Fetch ALL notifications using pagination 1202 1229 all_notifications = [] ··· 1236 1263 # Now process all fetched notifications 1237 1264 new_count = 0 1238 1265 if all_notifications: 1266 + logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API") 1267 + 1239 1268 # Mark as seen first 1240 1269 try: 1241 1270 atproto_client.app.bsky.notification.update_seen( ··· 1245 1274 except Exception as e: 1246 1275 logger.error(f"Error marking notifications as seen: {e}") 1247 1276 1248 - # Queue all new notifications (except likes and already read) 1277 + # Debug counters 1278 + skipped_read = 0 1279 + skipped_likes = 0 1280 + skipped_processed = 0 1281 + skipped_old_timestamp = 0 1282 + processed_uris = load_processed_notifications() 1283 + 1284 + # Queue all new notifications (except likes) 1249 1285 for notif in all_notifications: 1250 - # Skip if already read or if it's a like 1251 - if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): 1286 + # Skip if older than last processed (when we have timestamp filtering) 1287 + if last_processed_time and hasattr(notif, 'indexed_at'): 1288 + if notif.indexed_at <= last_processed_time: 1289 + skipped_old_timestamp += 1 1290 + logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})") 1291 + continue 1292 + 1293 + # Debug: Log is_read status but DON'T skip based on it 1294 + if hasattr(notif, 'is_read') and notif.is_read: 1295 + skipped_read += 1 1296 + logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}") 1297 + 1298 + # Skip likes 1299 + if hasattr(notif, 'reason') and notif.reason == 'like': 1300 + skipped_likes += 1 1252 1301 continue 1253 1302 1254 1303 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif ··· 1257 1306 if notif_dict.get('reason') == 'like': 1258 1307 continue 1259 1308 1309 + # Check if already processed 1310 + notif_uri = notif_dict.get('uri', '') 1311 + if notif_uri in processed_uris: 1312 + skipped_processed += 1 1313 + logger.debug(f"Skipping already processed: {notif_uri}") 1314 + continue 1315 + 1260 1316 # Check if it's a priority notification 1261 1317 is_priority = False 1262 1318 ··· 1275 1331 1276 1332 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1277 1333 new_count += 1 1334 + logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}") 1278 1335 1279 - logger.info(f"Queued {new_count} new notifications and marked as seen") 1336 + # Log summary of filtering 1337 + logger.info(f"📊 Notification processing summary:") 1338 + logger.info(f" • Total fetched: {len(all_notifications)}") 1339 + logger.info(f" • Had is_read=True: {skipped_read} (not skipped)") 1340 + logger.info(f" • Skipped (likes): {skipped_likes}") 1341 + logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}") 1342 + logger.info(f" • Skipped (already processed): {skipped_processed}") 1343 + logger.info(f" • Queued for processing: {new_count}") 1280 1344 else: 1281 1345 logger.debug("No new notifications to queue") 1282 1346 ··· 1759 1823 void_agent = initialize_void() 1760 1824 logger.info(f"Void agent initialized: {void_agent.id}") 1761 1825 1826 + # Initialize notification database 1827 + global NOTIFICATION_DB 1828 + logger.info("Initializing notification database...") 1829 + NOTIFICATION_DB = NotificationDB() 1830 + 1831 + # Migrate from old JSON format if it exists 1832 + if PROCESSED_NOTIFICATIONS_FILE.exists(): 1833 + logger.info("Found old processed_notifications.json, migrating to database...") 1834 + NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE)) 1835 + 1836 + # Log database stats 1837 + db_stats = NOTIFICATION_DB.get_stats() 1838 + logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}") 1839 + 1840 + # Clean up old records 1841 + NOTIFICATION_DB.cleanup_old_records(days=7) 1842 + 1762 1843 # Ensure correct tools are attached for Bluesky 1763 1844 logger.info("Configuring tools for Bluesky platform...") 1764 1845 try: ··· 1858 1939 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1859 1940 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1860 1941 periodic_user_block_cleanup(CLIENT, void_agent.id) 1942 + 1943 + # Also check database health when doing cleanup 1944 + if NOTIFICATION_DB: 1945 + db_stats = NOTIFICATION_DB.get_stats() 1946 + pending = db_stats.get('status_pending', 0) 1947 + errors = db_stats.get('status_error', 0) 1948 + 1949 + if pending > 50: 1950 + logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)") 1951 + if errors > 20: 1952 + logger.warning(f"⚠️ Queue health check: {errors} error notifications") 1953 + 1954 + # Periodic cleanup of old records 1955 + if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles 1956 + logger.info("Running database cleanup of old records...") 1957 + NOTIFICATION_DB.cleanup_old_records(days=7) 1861 1958 1862 1959 # Log cycle completion with stats 1863 1960 elapsed_time = time.time() - start_time ··· 1881 1978 logger.info(f" - {message_counters['follows']} follows") 1882 1979 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1883 1980 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1981 + 1982 + # Close database connection 1983 + if NOTIFICATION_DB: 1984 + logger.info("Closing database connection...") 1985 + NOTIFICATION_DB.close() 1986 + 1884 1987 break 1885 1988 except Exception as e: 1886 1989 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
+293
notification_db.py
··· 1 + #!/usr/bin/env python3 2 + """SQLite database for robust notification tracking.""" 3 + 4 + import sqlite3 5 + import json 6 + from pathlib import Path 7 + from datetime import datetime, timedelta 8 + from typing import Set, Dict, List, Optional, Tuple 9 + import logging 10 + 11 + logger = logging.getLogger(__name__) 12 + 13 + class NotificationDB: 14 + """Database for tracking notification processing state.""" 15 + 16 + def __init__(self, db_path: str = "queue/notifications.db"): 17 + """Initialize the notification database.""" 18 + self.db_path = Path(db_path) 19 + self.db_path.parent.mkdir(exist_ok=True, parents=True) 20 + self.conn = None 21 + self._init_db() 22 + 23 + def _init_db(self): 24 + """Initialize database schema.""" 25 + self.conn = sqlite3.connect(self.db_path, check_same_thread=False) 26 + self.conn.row_factory = sqlite3.Row 27 + 28 + # Create main notifications table 29 + self.conn.execute(""" 30 + CREATE TABLE IF NOT EXISTS notifications ( 31 + uri TEXT PRIMARY KEY, 32 + indexed_at TEXT NOT NULL, 33 + processed_at TEXT, 34 + status TEXT NOT NULL DEFAULT 'pending', 35 + reason TEXT, 36 + author_handle TEXT, 37 + author_did TEXT, 38 + text TEXT, 39 + parent_uri TEXT, 40 + root_uri TEXT, 41 + error TEXT, 42 + metadata TEXT 43 + ) 44 + """) 45 + 46 + # Create indexes for faster lookups 47 + self.conn.execute(""" 48 + CREATE INDEX IF NOT EXISTS idx_indexed_at 49 + ON notifications(indexed_at DESC) 50 + """) 51 + 52 + self.conn.execute(""" 53 + CREATE INDEX IF NOT EXISTS idx_status 54 + ON notifications(status) 55 + """) 56 + 57 + self.conn.execute(""" 58 + CREATE INDEX IF NOT EXISTS idx_author_handle 59 + ON notifications(author_handle) 60 + """) 61 + 62 + # Create session tracking table 63 + self.conn.execute(""" 64 + CREATE TABLE IF NOT EXISTS sessions ( 65 + id INTEGER PRIMARY KEY AUTOINCREMENT, 66 + started_at TEXT NOT NULL, 67 + ended_at TEXT, 68 + last_seen_at TEXT, 69 + notifications_processed INTEGER DEFAULT 0, 70 + notifications_skipped INTEGER DEFAULT 0, 71 + notifications_error INTEGER DEFAULT 0 72 + ) 73 + """) 74 + 75 + self.conn.commit() 76 + 77 + def add_notification(self, notif_dict: Dict) -> bool: 78 + """Add a notification to the database.""" 79 + try: 80 + # Handle None input 81 + if not notif_dict: 82 + return False 83 + 84 + # Extract key fields 85 + uri = notif_dict.get('uri', '') 86 + if not uri: 87 + return False 88 + 89 + indexed_at = notif_dict.get('indexed_at', '') 90 + reason = notif_dict.get('reason', '') 91 + author = notif_dict.get('author', {}) if notif_dict.get('author') else {} 92 + author_handle = author.get('handle', '') if author else '' 93 + author_did = author.get('did', '') if author else '' 94 + 95 + # Extract text from record if available 96 + record = notif_dict.get('record', {}) 97 + text = record.get('text', '')[:500] # Limit text length 98 + 99 + # Extract thread info 100 + parent_uri = None 101 + root_uri = None 102 + if 'reply' in record: 103 + parent_uri = record['reply'].get('parent', {}).get('uri') 104 + root_uri = record['reply'].get('root', {}).get('uri') 105 + 106 + # Store additional metadata as JSON 107 + metadata = { 108 + 'cid': notif_dict.get('cid'), 109 + 'labels': notif_dict.get('labels', []), 110 + 'is_read': notif_dict.get('is_read', False) 111 + } 112 + 113 + self.conn.execute(""" 114 + INSERT OR IGNORE INTO notifications 115 + (uri, indexed_at, reason, author_handle, author_did, text, 116 + parent_uri, root_uri, status, metadata) 117 + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?) 118 + """, (uri, indexed_at, reason, author_handle, author_did, text, 119 + parent_uri, root_uri, json.dumps(metadata))) 120 + 121 + self.conn.commit() 122 + return True 123 + 124 + except Exception as e: 125 + logger.error(f"Error adding notification to DB: {e}") 126 + return False 127 + 128 + def is_processed(self, uri: str) -> bool: 129 + """Check if a notification has been processed.""" 130 + cursor = self.conn.execute(""" 131 + SELECT status FROM notifications WHERE uri = ? 132 + """, (uri,)) 133 + row = cursor.fetchone() 134 + 135 + if row: 136 + return row['status'] in ['processed', 'ignored', 'no_reply'] 137 + return False 138 + 139 + def mark_processed(self, uri: str, status: str = 'processed', error: str = None): 140 + """Mark a notification as processed.""" 141 + try: 142 + self.conn.execute(""" 143 + UPDATE notifications 144 + SET status = ?, processed_at = ?, error = ? 145 + WHERE uri = ? 146 + """, (status, datetime.now().isoformat(), error, uri)) 147 + self.conn.commit() 148 + except Exception as e: 149 + logger.error(f"Error marking notification processed: {e}") 150 + 151 + def get_unprocessed(self, limit: int = 100) -> List[Dict]: 152 + """Get unprocessed notifications.""" 153 + cursor = self.conn.execute(""" 154 + SELECT * FROM notifications 155 + WHERE status = 'pending' 156 + ORDER BY indexed_at ASC 157 + LIMIT ? 158 + """, (limit,)) 159 + 160 + return [dict(row) for row in cursor] 161 + 162 + def get_latest_processed_time(self) -> Optional[str]: 163 + """Get the timestamp of the most recently processed notification.""" 164 + cursor = self.conn.execute(""" 165 + SELECT MAX(indexed_at) as latest 166 + FROM notifications 167 + WHERE status IN ('processed', 'ignored', 'no_reply') 168 + """) 169 + row = cursor.fetchone() 170 + return row['latest'] if row and row['latest'] else None 171 + 172 + def cleanup_old_records(self, days: int = 7): 173 + """Remove records older than specified days.""" 174 + cutoff_date = (datetime.now() - timedelta(days=days)).isoformat() 175 + 176 + deleted = self.conn.execute(""" 177 + DELETE FROM notifications 178 + WHERE indexed_at < ? 179 + AND status IN ('processed', 'ignored', 'no_reply', 'error') 180 + """, (cutoff_date,)).rowcount 181 + 182 + self.conn.commit() 183 + 184 + if deleted > 0: 185 + logger.info(f"Cleaned up {deleted} old notification records") 186 + # Vacuum to reclaim space 187 + self.conn.execute("VACUUM") 188 + 189 + def get_stats(self) -> Dict: 190 + """Get database statistics.""" 191 + stats = {} 192 + 193 + # Count by status 194 + cursor = self.conn.execute(""" 195 + SELECT status, COUNT(*) as count 196 + FROM notifications 197 + GROUP BY status 198 + """) 199 + 200 + for row in cursor: 201 + stats[f"status_{row['status']}"] = row['count'] 202 + 203 + # Total count 204 + cursor = self.conn.execute("SELECT COUNT(*) as total FROM notifications") 205 + stats['total'] = cursor.fetchone()['total'] 206 + 207 + # Recent activity (last 24h) 208 + yesterday = (datetime.now() - timedelta(days=1)).isoformat() 209 + cursor = self.conn.execute(""" 210 + SELECT COUNT(*) as recent 211 + FROM notifications 212 + WHERE indexed_at > ? 213 + """, (yesterday,)) 214 + stats['recent_24h'] = cursor.fetchone()['recent'] 215 + 216 + return stats 217 + 218 + def start_session(self) -> int: 219 + """Start a new processing session.""" 220 + cursor = self.conn.execute(""" 221 + INSERT INTO sessions (started_at, last_seen_at) 222 + VALUES (?, ?) 223 + """, (datetime.now().isoformat(), datetime.now().isoformat())) 224 + self.conn.commit() 225 + return cursor.lastrowid 226 + 227 + def update_session(self, session_id: int, processed: int = 0, skipped: int = 0, error: int = 0): 228 + """Update session statistics.""" 229 + self.conn.execute(""" 230 + UPDATE sessions 231 + SET last_seen_at = ?, 232 + notifications_processed = notifications_processed + ?, 233 + notifications_skipped = notifications_skipped + ?, 234 + notifications_error = notifications_error + ? 235 + WHERE id = ? 236 + """, (datetime.now().isoformat(), processed, skipped, error, session_id)) 237 + self.conn.commit() 238 + 239 + def end_session(self, session_id: int): 240 + """End a processing session.""" 241 + self.conn.execute(""" 242 + UPDATE sessions 243 + SET ended_at = ? 244 + WHERE id = ? 245 + """, (datetime.now().isoformat(), session_id)) 246 + self.conn.commit() 247 + 248 + def get_processed_uris(self, limit: int = 10000) -> Set[str]: 249 + """Get set of processed URIs for compatibility with existing code.""" 250 + cursor = self.conn.execute(""" 251 + SELECT uri FROM notifications 252 + WHERE status IN ('processed', 'ignored', 'no_reply') 253 + ORDER BY processed_at DESC 254 + LIMIT ? 255 + """, (limit,)) 256 + 257 + return {row['uri'] for row in cursor} 258 + 259 + def migrate_from_json(self, json_path: str = "queue/processed_notifications.json"): 260 + """Migrate data from the old JSON format.""" 261 + json_file = Path(json_path) 262 + if not json_file.exists(): 263 + return 264 + 265 + try: 266 + with open(json_file, 'r') as f: 267 + uris = json.load(f) 268 + 269 + migrated = 0 270 + for uri in uris: 271 + # Add as processed with unknown timestamp 272 + self.conn.execute(""" 273 + INSERT OR IGNORE INTO notifications 274 + (uri, indexed_at, status, processed_at) 275 + VALUES (?, ?, 'processed', ?) 276 + """, (uri, datetime.now().isoformat(), datetime.now().isoformat())) 277 + migrated += 1 278 + 279 + self.conn.commit() 280 + logger.info(f"Migrated {migrated} URIs from JSON to database") 281 + 282 + # Rename old file to backup 283 + backup_path = json_file.with_suffix('.json.backup') 284 + json_file.rename(backup_path) 285 + logger.info(f"Renamed old JSON file to {backup_path}") 286 + 287 + except Exception as e: 288 + logger.error(f"Error migrating from JSON: {e}") 289 + 290 + def close(self): 291 + """Close database connection.""" 292 + if self.conn: 293 + self.conn.close()
+249
notification_recovery.py
··· 1 + #!/usr/bin/env python3 2 + """Recovery tools for missed notifications.""" 3 + 4 + import argparse 5 + import logging 6 + from datetime import datetime, timedelta 7 + from pathlib import Path 8 + import json 9 + import bsky_utils 10 + from notification_db import NotificationDB 11 + from bsky import save_notification_to_queue, notification_to_dict 12 + 13 + # Configure logging 14 + logging.basicConfig( 15 + level=logging.INFO, 16 + format="%(asctime)s - %(levelname)s - %(message)s" 17 + ) 18 + logger = logging.getLogger(__name__) 19 + 20 + 21 + def recover_notifications(hours=24, dry_run=True): 22 + """ 23 + Recover notifications from the past N hours. 24 + 25 + Args: 26 + hours: Number of hours back to check for notifications 27 + dry_run: If True, only show what would be recovered without saving 28 + """ 29 + logger.info(f"Recovering notifications from the past {hours} hours") 30 + logger.info(f"Dry run: {dry_run}") 31 + 32 + # Initialize Bluesky client 33 + client = bsky_utils.default_login() 34 + logger.info("Connected to Bluesky") 35 + 36 + # Initialize database 37 + db = NotificationDB() 38 + logger.info("Database initialized") 39 + 40 + # Fetch notifications 41 + all_notifications = [] 42 + cursor = None 43 + page_count = 0 44 + max_pages = 50 # More pages for recovery 45 + 46 + cutoff_time = datetime.now() - timedelta(hours=hours) 47 + cutoff_iso = cutoff_time.isoformat() 48 + logger.info(f"Looking for notifications since: {cutoff_iso}") 49 + 50 + while page_count < max_pages: 51 + try: 52 + # Fetch notifications page 53 + if cursor: 54 + response = client.app.bsky.notification.list_notifications( 55 + params={'cursor': cursor, 'limit': 100} 56 + ) 57 + else: 58 + response = client.app.bsky.notification.list_notifications( 59 + params={'limit': 100} 60 + ) 61 + 62 + page_count += 1 63 + page_notifications = response.notifications 64 + 65 + if not page_notifications: 66 + break 67 + 68 + # Filter by time 69 + for notif in page_notifications: 70 + if hasattr(notif, 'indexed_at') and notif.indexed_at >= cutoff_iso: 71 + all_notifications.append(notif) 72 + elif hasattr(notif, 'indexed_at') and notif.indexed_at < cutoff_iso: 73 + # We've gone past our cutoff, stop fetching 74 + logger.info(f"Reached notifications older than {hours} hours, stopping") 75 + cursor = None 76 + break 77 + 78 + # Check if there are more pages 79 + if cursor is None: 80 + break 81 + cursor = getattr(response, 'cursor', None) 82 + if not cursor: 83 + break 84 + 85 + except Exception as e: 86 + logger.error(f"Error fetching notifications page {page_count}: {e}") 87 + break 88 + 89 + logger.info(f"Found {len(all_notifications)} notifications in the time range") 90 + 91 + # Process notifications 92 + recovered = 0 93 + skipped_likes = 0 94 + already_processed = 0 95 + 96 + for notif in all_notifications: 97 + # Skip likes 98 + if hasattr(notif, 'reason') and notif.reason == 'like': 99 + skipped_likes += 1 100 + continue 101 + 102 + # Check if already processed 103 + notif_dict = notification_to_dict(notif) 104 + uri = notif_dict.get('uri', '') 105 + 106 + if db.is_processed(uri): 107 + already_processed += 1 108 + logger.debug(f"Already processed: {uri}") 109 + continue 110 + 111 + # Log what we would recover 112 + author = notif_dict.get('author', {}) 113 + author_handle = author.get('handle', 'unknown') 114 + reason = notif_dict.get('reason', 'unknown') 115 + indexed_at = notif_dict.get('indexed_at', '') 116 + 117 + logger.info(f"Would recover: {reason} from @{author_handle} at {indexed_at}") 118 + 119 + if not dry_run: 120 + # Save to queue 121 + if save_notification_to_queue(notif_dict): 122 + recovered += 1 123 + logger.info(f"Recovered notification from @{author_handle}") 124 + else: 125 + logger.warning(f"Failed to queue notification from @{author_handle}") 126 + else: 127 + recovered += 1 128 + 129 + # Summary 130 + logger.info(f"Recovery summary:") 131 + logger.info(f" • Total found: {len(all_notifications)}") 132 + logger.info(f" • Skipped (likes): {skipped_likes}") 133 + logger.info(f" • Already processed: {already_processed}") 134 + logger.info(f" • {'Would recover' if dry_run else 'Recovered'}: {recovered}") 135 + 136 + if dry_run and recovered > 0: 137 + logger.info("Run with --execute to actually recover these notifications") 138 + 139 + return recovered 140 + 141 + 142 + def check_database_health(): 143 + """Check the health of the notification database.""" 144 + db = NotificationDB() 145 + stats = db.get_stats() 146 + 147 + logger.info("Database Statistics:") 148 + logger.info(f" • Total notifications: {stats.get('total', 0)}") 149 + logger.info(f" • Pending: {stats.get('status_pending', 0)}") 150 + logger.info(f" • Processed: {stats.get('status_processed', 0)}") 151 + logger.info(f" • Ignored: {stats.get('status_ignored', 0)}") 152 + logger.info(f" • No reply: {stats.get('status_no_reply', 0)}") 153 + logger.info(f" • Errors: {stats.get('status_error', 0)}") 154 + logger.info(f" • Recent (24h): {stats.get('recent_24h', 0)}") 155 + 156 + # Check for issues 157 + if stats.get('status_pending', 0) > 100: 158 + logger.warning(f"⚠️ High number of pending notifications: {stats.get('status_pending', 0)}") 159 + 160 + if stats.get('status_error', 0) > 50: 161 + logger.warning(f"⚠️ High number of error notifications: {stats.get('status_error', 0)}") 162 + 163 + return stats 164 + 165 + 166 + def reset_notification_status(hours=1, dry_run=True): 167 + """ 168 + Reset notifications from error/no_reply status back to pending. 169 + 170 + Args: 171 + hours: Reset notifications from the last N hours 172 + dry_run: If True, only show what would be reset 173 + """ 174 + db = NotificationDB() 175 + cutoff_time = datetime.now() - timedelta(hours=hours) 176 + cutoff_iso = cutoff_time.isoformat() 177 + 178 + # Get notifications to reset 179 + cursor = db.conn.execute(""" 180 + SELECT uri, status, indexed_at, author_handle 181 + FROM notifications 182 + WHERE status IN ('error', 'no_reply') 183 + AND indexed_at > ? 184 + ORDER BY indexed_at DESC 185 + """, (cutoff_iso,)) 186 + 187 + notifications_to_reset = cursor.fetchall() 188 + 189 + if not notifications_to_reset: 190 + logger.info(f"No notifications to reset from the last {hours} hours") 191 + return 0 192 + 193 + logger.info(f"Found {len(notifications_to_reset)} notifications to reset") 194 + 195 + for notif in notifications_to_reset: 196 + logger.info(f"Would reset: {notif['status']} -> pending for @{notif['author_handle']} at {notif['indexed_at']}") 197 + 198 + if not dry_run: 199 + reset_count = db.conn.execute(""" 200 + UPDATE notifications 201 + SET status = 'pending', processed_at = NULL, error = NULL 202 + WHERE status IN ('error', 'no_reply') 203 + AND indexed_at > ? 204 + """, (cutoff_iso,)).rowcount 205 + 206 + db.conn.commit() 207 + logger.info(f"Reset {reset_count} notifications to pending status") 208 + return reset_count 209 + else: 210 + logger.info("Run with --execute to actually reset these notifications") 211 + return len(notifications_to_reset) 212 + 213 + 214 + def main(): 215 + parser = argparse.ArgumentParser(description="Notification recovery and management tools") 216 + 217 + subparsers = parser.add_subparsers(dest='command', help='Command to run') 218 + 219 + # Recover command 220 + recover_parser = subparsers.add_parser('recover', help='Recover missed notifications') 221 + recover_parser.add_argument('--hours', type=int, default=24, 222 + help='Number of hours back to check (default: 24)') 223 + recover_parser.add_argument('--execute', action='store_true', 224 + help='Actually recover notifications (default is dry run)') 225 + 226 + # Health check command 227 + health_parser = subparsers.add_parser('health', help='Check database health') 228 + 229 + # Reset command 230 + reset_parser = subparsers.add_parser('reset', help='Reset error notifications to pending') 231 + reset_parser.add_argument('--hours', type=int, default=1, 232 + help='Reset notifications from last N hours (default: 1)') 233 + reset_parser.add_argument('--execute', action='store_true', 234 + help='Actually reset notifications (default is dry run)') 235 + 236 + args = parser.parse_args() 237 + 238 + if args.command == 'recover': 239 + recover_notifications(hours=args.hours, dry_run=not args.execute) 240 + elif args.command == 'health': 241 + check_database_health() 242 + elif args.command == 'reset': 243 + reset_notification_status(hours=args.hours, dry_run=not args.execute) 244 + else: 245 + parser.print_help() 246 + 247 + 248 + if __name__ == "__main__": 249 + main()