a digital person for bluesky
1#!/usr/bin/env python3
2"""Recovery tools for missed notifications."""
3
4import argparse
5import logging
6from datetime import datetime, timedelta
7from pathlib import Path
8import json
9import bsky_utils
10from notification_db import NotificationDB
11from bsky import save_notification_to_queue, notification_to_dict
12
13# Configure logging
14logging.basicConfig(
15 level=logging.INFO,
16 format="%(asctime)s - %(levelname)s - %(message)s"
17)
18logger = logging.getLogger(__name__)
19
20
21def recover_notifications(hours=24, dry_run=True):
22 """
23 Recover notifications from the past N hours.
24
25 Args:
26 hours: Number of hours back to check for notifications
27 dry_run: If True, only show what would be recovered without saving
28 """
29 logger.info(f"Recovering notifications from the past {hours} hours")
30 logger.info(f"Dry run: {dry_run}")
31
32 # Initialize Bluesky client
33 client = bsky_utils.default_login()
34 logger.info("Connected to Bluesky")
35
36 # Initialize database
37 db = NotificationDB()
38 logger.info("Database initialized")
39
40 # Fetch notifications
41 all_notifications = []
42 cursor = None
43 page_count = 0
44 max_pages = 50 # More pages for recovery
45
46 cutoff_time = datetime.now() - timedelta(hours=hours)
47 cutoff_iso = cutoff_time.isoformat()
48 logger.info(f"Looking for notifications since: {cutoff_iso}")
49
50 while page_count < max_pages:
51 try:
52 # Fetch notifications page
53 if cursor:
54 response = client.app.bsky.notification.list_notifications(
55 params={'cursor': cursor, 'limit': 100}
56 )
57 else:
58 response = client.app.bsky.notification.list_notifications(
59 params={'limit': 100}
60 )
61
62 page_count += 1
63 page_notifications = response.notifications
64
65 if not page_notifications:
66 break
67
68 # Filter by time
69 for notif in page_notifications:
70 if hasattr(notif, 'indexed_at') and notif.indexed_at >= cutoff_iso:
71 all_notifications.append(notif)
72 elif hasattr(notif, 'indexed_at') and notif.indexed_at < cutoff_iso:
73 # We've gone past our cutoff, stop fetching
74 logger.info(f"Reached notifications older than {hours} hours, stopping")
75 cursor = None
76 break
77
78 # Check if there are more pages
79 if cursor is None:
80 break
81 cursor = getattr(response, 'cursor', None)
82 if not cursor:
83 break
84
85 except Exception as e:
86 logger.error(f"Error fetching notifications page {page_count}: {e}")
87 break
88
89 logger.info(f"Found {len(all_notifications)} notifications in the time range")
90
91 # Process notifications
92 recovered = 0
93 skipped_likes = 0
94 already_processed = 0
95
96 for notif in all_notifications:
97 # Skip likes
98 if hasattr(notif, 'reason') and notif.reason == 'like':
99 skipped_likes += 1
100 continue
101
102 # Check if already processed
103 notif_dict = notification_to_dict(notif)
104 uri = notif_dict.get('uri', '')
105
106 if db.is_processed(uri):
107 already_processed += 1
108 logger.debug(f"Already processed: {uri}")
109 continue
110
111 # Log what we would recover
112 author = notif_dict.get('author', {})
113 author_handle = author.get('handle', 'unknown')
114 reason = notif_dict.get('reason', 'unknown')
115 indexed_at = notif_dict.get('indexed_at', '')
116
117 logger.info(f"Would recover: {reason} from @{author_handle} at {indexed_at}")
118
119 if not dry_run:
120 # Save to queue
121 if save_notification_to_queue(notif_dict):
122 recovered += 1
123 logger.info(f"Recovered notification from @{author_handle}")
124 else:
125 logger.warning(f"Failed to queue notification from @{author_handle}")
126 else:
127 recovered += 1
128
129 # Summary
130 logger.info(f"Recovery summary:")
131 logger.info(f" • Total found: {len(all_notifications)}")
132 logger.info(f" • Skipped (likes): {skipped_likes}")
133 logger.info(f" • Already processed: {already_processed}")
134 logger.info(f" • {'Would recover' if dry_run else 'Recovered'}: {recovered}")
135
136 if dry_run and recovered > 0:
137 logger.info("Run with --execute to actually recover these notifications")
138
139 return recovered
140
141
142def check_database_health():
143 """Check the health of the notification database."""
144 db = NotificationDB()
145 stats = db.get_stats()
146
147 logger.info("Database Statistics:")
148 logger.info(f" • Total notifications: {stats.get('total', 0)}")
149 logger.info(f" • Pending: {stats.get('status_pending', 0)}")
150 logger.info(f" • Processed: {stats.get('status_processed', 0)}")
151 logger.info(f" • Ignored: {stats.get('status_ignored', 0)}")
152 logger.info(f" • No reply: {stats.get('status_no_reply', 0)}")
153 logger.info(f" • Errors: {stats.get('status_error', 0)}")
154 logger.info(f" • Recent (24h): {stats.get('recent_24h', 0)}")
155
156 # Check for issues
157 if stats.get('status_pending', 0) > 100:
158 logger.warning(f"⚠️ High number of pending notifications: {stats.get('status_pending', 0)}")
159
160 if stats.get('status_error', 0) > 50:
161 logger.warning(f"⚠️ High number of error notifications: {stats.get('status_error', 0)}")
162
163 return stats
164
165
166def reset_notification_status(hours=1, dry_run=True):
167 """
168 Reset notifications from error/no_reply status back to pending.
169
170 Args:
171 hours: Reset notifications from the last N hours
172 dry_run: If True, only show what would be reset
173 """
174 db = NotificationDB()
175 cutoff_time = datetime.now() - timedelta(hours=hours)
176 cutoff_iso = cutoff_time.isoformat()
177
178 # Get notifications to reset
179 cursor = db.conn.execute("""
180 SELECT uri, status, indexed_at, author_handle
181 FROM notifications
182 WHERE status IN ('error', 'no_reply')
183 AND indexed_at > ?
184 ORDER BY indexed_at DESC
185 """, (cutoff_iso,))
186
187 notifications_to_reset = cursor.fetchall()
188
189 if not notifications_to_reset:
190 logger.info(f"No notifications to reset from the last {hours} hours")
191 return 0
192
193 logger.info(f"Found {len(notifications_to_reset)} notifications to reset")
194
195 for notif in notifications_to_reset:
196 logger.info(f"Would reset: {notif['status']} -> pending for @{notif['author_handle']} at {notif['indexed_at']}")
197
198 if not dry_run:
199 reset_count = db.conn.execute("""
200 UPDATE notifications
201 SET status = 'pending', processed_at = NULL, error = NULL
202 WHERE status IN ('error', 'no_reply')
203 AND indexed_at > ?
204 """, (cutoff_iso,)).rowcount
205
206 db.conn.commit()
207 logger.info(f"Reset {reset_count} notifications to pending status")
208 return reset_count
209 else:
210 logger.info("Run with --execute to actually reset these notifications")
211 return len(notifications_to_reset)
212
213
214def main():
215 parser = argparse.ArgumentParser(description="Notification recovery and management tools")
216
217 subparsers = parser.add_subparsers(dest='command', help='Command to run')
218
219 # Recover command
220 recover_parser = subparsers.add_parser('recover', help='Recover missed notifications')
221 recover_parser.add_argument('--hours', type=int, default=24,
222 help='Number of hours back to check (default: 24)')
223 recover_parser.add_argument('--execute', action='store_true',
224 help='Actually recover notifications (default is dry run)')
225
226 # Health check command
227 health_parser = subparsers.add_parser('health', help='Check database health')
228
229 # Reset command
230 reset_parser = subparsers.add_parser('reset', help='Reset error notifications to pending')
231 reset_parser.add_argument('--hours', type=int, default=1,
232 help='Reset notifications from last N hours (default: 1)')
233 reset_parser.add_argument('--execute', action='store_true',
234 help='Actually reset notifications (default is dry run)')
235
236 args = parser.parse_args()
237
238 if args.command == 'recover':
239 recover_notifications(hours=args.hours, dry_run=not args.execute)
240 elif args.command == 'health':
241 check_database_health()
242 elif args.command == 'reset':
243 reset_notification_status(hours=args.hours, dry_run=not args.execute)
244 else:
245 parser.print_help()
246
247
248if __name__ == "__main__":
249 main()