a digital person for bluesky
at main 8.9 kB view raw
1#!/usr/bin/env python3 2"""Recovery tools for missed notifications.""" 3 4import argparse 5import logging 6from datetime import datetime, timedelta 7from pathlib import Path 8import json 9import bsky_utils 10from notification_db import NotificationDB 11from bsky import save_notification_to_queue, notification_to_dict 12 13# Configure logging 14logging.basicConfig( 15 level=logging.INFO, 16 format="%(asctime)s - %(levelname)s - %(message)s" 17) 18logger = logging.getLogger(__name__) 19 20 21def recover_notifications(hours=24, dry_run=True): 22 """ 23 Recover notifications from the past N hours. 24 25 Args: 26 hours: Number of hours back to check for notifications 27 dry_run: If True, only show what would be recovered without saving 28 """ 29 logger.info(f"Recovering notifications from the past {hours} hours") 30 logger.info(f"Dry run: {dry_run}") 31 32 # Initialize Bluesky client 33 client = bsky_utils.default_login() 34 logger.info("Connected to Bluesky") 35 36 # Initialize database 37 db = NotificationDB() 38 logger.info("Database initialized") 39 40 # Fetch notifications 41 all_notifications = [] 42 cursor = None 43 page_count = 0 44 max_pages = 50 # More pages for recovery 45 46 cutoff_time = datetime.now() - timedelta(hours=hours) 47 cutoff_iso = cutoff_time.isoformat() 48 logger.info(f"Looking for notifications since: {cutoff_iso}") 49 50 while page_count < max_pages: 51 try: 52 # Fetch notifications page 53 if cursor: 54 response = client.app.bsky.notification.list_notifications( 55 params={'cursor': cursor, 'limit': 100} 56 ) 57 else: 58 response = client.app.bsky.notification.list_notifications( 59 params={'limit': 100} 60 ) 61 62 page_count += 1 63 page_notifications = response.notifications 64 65 if not page_notifications: 66 break 67 68 # Filter by time 69 for notif in page_notifications: 70 if hasattr(notif, 'indexed_at') and notif.indexed_at >= cutoff_iso: 71 all_notifications.append(notif) 72 elif hasattr(notif, 'indexed_at') and notif.indexed_at < cutoff_iso: 73 # We've gone past our cutoff, stop fetching 74 logger.info(f"Reached notifications older than {hours} hours, stopping") 75 cursor = None 76 break 77 78 # Check if there are more pages 79 if cursor is None: 80 break 81 cursor = getattr(response, 'cursor', None) 82 if not cursor: 83 break 84 85 except Exception as e: 86 logger.error(f"Error fetching notifications page {page_count}: {e}") 87 break 88 89 logger.info(f"Found {len(all_notifications)} notifications in the time range") 90 91 # Process notifications 92 recovered = 0 93 skipped_likes = 0 94 already_processed = 0 95 96 for notif in all_notifications: 97 # Skip likes 98 if hasattr(notif, 'reason') and notif.reason == 'like': 99 skipped_likes += 1 100 continue 101 102 # Check if already processed 103 notif_dict = notification_to_dict(notif) 104 uri = notif_dict.get('uri', '') 105 106 if db.is_processed(uri): 107 already_processed += 1 108 logger.debug(f"Already processed: {uri}") 109 continue 110 111 # Log what we would recover 112 author = notif_dict.get('author', {}) 113 author_handle = author.get('handle', 'unknown') 114 reason = notif_dict.get('reason', 'unknown') 115 indexed_at = notif_dict.get('indexed_at', '') 116 117 logger.info(f"Would recover: {reason} from @{author_handle} at {indexed_at}") 118 119 if not dry_run: 120 # Save to queue 121 if save_notification_to_queue(notif_dict): 122 recovered += 1 123 logger.info(f"Recovered notification from @{author_handle}") 124 else: 125 logger.warning(f"Failed to queue notification from @{author_handle}") 126 else: 127 recovered += 1 128 129 # Summary 130 logger.info(f"Recovery summary:") 131 logger.info(f" • Total found: {len(all_notifications)}") 132 logger.info(f" • Skipped (likes): {skipped_likes}") 133 logger.info(f" • Already processed: {already_processed}") 134 logger.info(f"{'Would recover' if dry_run else 'Recovered'}: {recovered}") 135 136 if dry_run and recovered > 0: 137 logger.info("Run with --execute to actually recover these notifications") 138 139 return recovered 140 141 142def check_database_health(): 143 """Check the health of the notification database.""" 144 db = NotificationDB() 145 stats = db.get_stats() 146 147 logger.info("Database Statistics:") 148 logger.info(f" • Total notifications: {stats.get('total', 0)}") 149 logger.info(f" • Pending: {stats.get('status_pending', 0)}") 150 logger.info(f" • Processed: {stats.get('status_processed', 0)}") 151 logger.info(f" • Ignored: {stats.get('status_ignored', 0)}") 152 logger.info(f" • No reply: {stats.get('status_no_reply', 0)}") 153 logger.info(f" • Errors: {stats.get('status_error', 0)}") 154 logger.info(f" • Recent (24h): {stats.get('recent_24h', 0)}") 155 156 # Check for issues 157 if stats.get('status_pending', 0) > 100: 158 logger.warning(f"⚠️ High number of pending notifications: {stats.get('status_pending', 0)}") 159 160 if stats.get('status_error', 0) > 50: 161 logger.warning(f"⚠️ High number of error notifications: {stats.get('status_error', 0)}") 162 163 return stats 164 165 166def reset_notification_status(hours=1, dry_run=True): 167 """ 168 Reset notifications from error/no_reply status back to pending. 169 170 Args: 171 hours: Reset notifications from the last N hours 172 dry_run: If True, only show what would be reset 173 """ 174 db = NotificationDB() 175 cutoff_time = datetime.now() - timedelta(hours=hours) 176 cutoff_iso = cutoff_time.isoformat() 177 178 # Get notifications to reset 179 cursor = db.conn.execute(""" 180 SELECT uri, status, indexed_at, author_handle 181 FROM notifications 182 WHERE status IN ('error', 'no_reply') 183 AND indexed_at > ? 184 ORDER BY indexed_at DESC 185 """, (cutoff_iso,)) 186 187 notifications_to_reset = cursor.fetchall() 188 189 if not notifications_to_reset: 190 logger.info(f"No notifications to reset from the last {hours} hours") 191 return 0 192 193 logger.info(f"Found {len(notifications_to_reset)} notifications to reset") 194 195 for notif in notifications_to_reset: 196 logger.info(f"Would reset: {notif['status']} -> pending for @{notif['author_handle']} at {notif['indexed_at']}") 197 198 if not dry_run: 199 reset_count = db.conn.execute(""" 200 UPDATE notifications 201 SET status = 'pending', processed_at = NULL, error = NULL 202 WHERE status IN ('error', 'no_reply') 203 AND indexed_at > ? 204 """, (cutoff_iso,)).rowcount 205 206 db.conn.commit() 207 logger.info(f"Reset {reset_count} notifications to pending status") 208 return reset_count 209 else: 210 logger.info("Run with --execute to actually reset these notifications") 211 return len(notifications_to_reset) 212 213 214def main(): 215 parser = argparse.ArgumentParser(description="Notification recovery and management tools") 216 217 subparsers = parser.add_subparsers(dest='command', help='Command to run') 218 219 # Recover command 220 recover_parser = subparsers.add_parser('recover', help='Recover missed notifications') 221 recover_parser.add_argument('--hours', type=int, default=24, 222 help='Number of hours back to check (default: 24)') 223 recover_parser.add_argument('--execute', action='store_true', 224 help='Actually recover notifications (default is dry run)') 225 226 # Health check command 227 health_parser = subparsers.add_parser('health', help='Check database health') 228 229 # Reset command 230 reset_parser = subparsers.add_parser('reset', help='Reset error notifications to pending') 231 reset_parser.add_argument('--hours', type=int, default=1, 232 help='Reset notifications from last N hours (default: 1)') 233 reset_parser.add_argument('--execute', action='store_true', 234 help='Actually reset notifications (default is dry run)') 235 236 args = parser.parse_args() 237 238 if args.command == 'recover': 239 recover_notifications(hours=args.hours, dry_run=not args.execute) 240 elif args.command == 'health': 241 check_database_health() 242 elif args.command == 'reset': 243 reset_notification_status(hours=args.hours, dry_run=not args.execute) 244 else: 245 parser.print_help() 246 247 248if __name__ == "__main__": 249 main()