a digital person for bluesky
1#!/usr/bin/env python3
2"""Queue management utilities for Void bot."""
3import json
4import argparse
5from pathlib import Path
6from rich.console import Console
7from rich.table import Table
8from rich.prompt import Confirm
9
10console = Console()
11
12# Queue directories
13QUEUE_DIR = Path("queue")
14QUEUE_ERROR_DIR = QUEUE_DIR / "errors"
15QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply"
16
17
18def load_notification(filepath: Path) -> dict:
19 """Load a notification from a JSON file."""
20 try:
21 with open(filepath, 'r') as f:
22 return json.load(f)
23 except Exception as e:
24 console.print(f"[red]Error loading {filepath}: {e}[/red]")
25 return None
26
27
28def list_notifications(handle_filter: str = None, show_all: bool = False):
29 """List all notifications in the queue, optionally filtered by handle."""
30 # Collect notifications from all directories if show_all is True
31 if show_all:
32 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR]
33 else:
34 dirs_to_check = [QUEUE_DIR]
35
36 all_notifications = []
37
38 for directory in dirs_to_check:
39 if not directory.exists():
40 continue
41
42 # Get source directory name for display
43 if directory == QUEUE_DIR:
44 source = "queue"
45 elif directory == QUEUE_ERROR_DIR:
46 source = "errors"
47 elif directory == QUEUE_NO_REPLY_DIR:
48 source = "no_reply"
49 else:
50 source = "unknown"
51
52 for filepath in directory.glob("*.json"):
53 # Skip subdirectories
54 if filepath.is_dir():
55 continue
56
57 notif = load_notification(filepath)
58 if notif and isinstance(notif, dict):
59 notif['_filepath'] = filepath
60 notif['_source'] = source
61
62 # Apply handle filter if specified
63 if handle_filter:
64 author_handle = notif.get('author', {}).get('handle', '')
65 if handle_filter.lower() not in author_handle.lower():
66 continue
67
68 all_notifications.append(notif)
69
70 # Sort by indexed_at
71 all_notifications.sort(key=lambda x: x.get('indexed_at', ''), reverse=True)
72
73 # Display results
74 if not all_notifications:
75 if handle_filter:
76 console.print(f"[yellow]No notifications found for handle containing '{handle_filter}'[/yellow]")
77 else:
78 console.print("[yellow]No notifications found in queue[/yellow]")
79 return
80
81 table = Table(title=f"Queue Notifications ({len(all_notifications)} total)")
82 table.add_column("File", style="cyan", width=20)
83 table.add_column("Source", style="magenta", width=10)
84 table.add_column("Handle", style="green", width=25)
85 table.add_column("Display Name", width=25)
86 table.add_column("Text", width=40)
87 table.add_column("Time", style="dim", width=20)
88
89 for notif in all_notifications:
90 author = notif.get('author', {})
91 handle = author.get('handle', 'unknown')
92 display_name = author.get('display_name', '')
93 text = notif.get('record', {}).get('text', '')[:40]
94 if len(notif.get('record', {}).get('text', '')) > 40:
95 text += "..."
96 indexed_at = notif.get('indexed_at', '')[:19] # Trim milliseconds
97 filename = notif['_filepath'].name[:20]
98 source = notif['_source']
99
100 table.add_row(filename, source, f"@{handle}", display_name, text, indexed_at)
101
102 console.print(table)
103 return all_notifications
104
105
106def delete_by_handle(handle: str, dry_run: bool = False, force: bool = False):
107 """Delete all notifications from a specific handle."""
108 # Remove @ if present
109 handle = handle.lstrip('@')
110
111 # Find all notifications from this handle
112 console.print(f"\\n[bold]Searching for notifications from @{handle}...[/bold]\\n")
113
114 to_delete = []
115 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR]
116
117 for directory in dirs_to_check:
118 if not directory.exists():
119 continue
120
121 for filepath in directory.glob("*.json"):
122 if filepath.is_dir():
123 continue
124
125 notif = load_notification(filepath)
126 if notif and isinstance(notif, dict):
127 author_handle = notif.get('author', {}).get('handle', '')
128 if author_handle.lower() == handle.lower():
129 to_delete.append({
130 'filepath': filepath,
131 'notif': notif,
132 'source': directory.name
133 })
134
135 if not to_delete:
136 console.print(f"[yellow]No notifications found from @{handle}[/yellow]")
137 return
138
139 # Display what will be deleted
140 table = Table(title=f"Notifications to Delete from @{handle}")
141 table.add_column("File", style="cyan")
142 table.add_column("Location", style="magenta")
143 table.add_column("Text", width=50)
144 table.add_column("Time", style="dim")
145
146 for item in to_delete:
147 notif = item['notif']
148 text = notif.get('record', {}).get('text', '')[:50]
149 if len(notif.get('record', {}).get('text', '')) > 50:
150 text += "..."
151 indexed_at = notif.get('indexed_at', '')[:19]
152
153 table.add_row(
154 item['filepath'].name,
155 item['source'],
156 text,
157 indexed_at
158 )
159
160 console.print(table)
161 console.print(f"\\n[bold red]Found {len(to_delete)} notifications to delete[/bold red]")
162
163 if dry_run:
164 console.print("\\n[yellow]DRY RUN - No files were deleted[/yellow]")
165 return
166
167 # Confirm deletion
168 if not force and not Confirm.ask("\\nDo you want to delete these notifications?"):
169 console.print("[yellow]Deletion cancelled[/yellow]")
170 return
171
172 # Delete the files
173 deleted_count = 0
174 for item in to_delete:
175 try:
176 item['filepath'].unlink()
177 deleted_count += 1
178 console.print(f"[green]✓[/green] Deleted {item['filepath'].name}")
179 except Exception as e:
180 console.print(f"[red]✗[/red] Failed to delete {item['filepath'].name}: {e}")
181
182 console.print(f"\\n[bold green]Successfully deleted {deleted_count} notifications[/bold green]")
183
184
185def count_by_handle():
186 """Show detailed count of notifications by handle."""
187 handle_counts = {}
188
189 # Collect counts from all directories
190 for directory, location in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]:
191 if not directory.exists():
192 continue
193
194 for filepath in directory.glob("*.json"):
195 if filepath.is_dir():
196 continue
197
198 notif = load_notification(filepath)
199 if notif and isinstance(notif, dict):
200 handle = notif.get('author', {}).get('handle', 'unknown')
201
202 if handle not in handle_counts:
203 handle_counts[handle] = {'queue': 0, 'errors': 0, 'no_reply': 0, 'total': 0}
204
205 handle_counts[handle][location] += 1
206 handle_counts[handle]['total'] += 1
207
208 if not handle_counts:
209 console.print("[yellow]No notifications found in any queue[/yellow]")
210 return
211
212 # Sort by total count
213 sorted_handles = sorted(handle_counts.items(), key=lambda x: x[1]['total'], reverse=True)
214
215 # Display results
216 table = Table(title=f"Notification Count by Handle ({len(handle_counts)} unique handles)")
217 table.add_column("Handle", style="green", width=30)
218 table.add_column("Queue", style="cyan", justify="right")
219 table.add_column("Errors", style="red", justify="right")
220 table.add_column("No Reply", style="yellow", justify="right")
221 table.add_column("Total", style="bold magenta", justify="right")
222
223 for handle, counts in sorted_handles:
224 table.add_row(
225 f"@{handle}",
226 str(counts['queue']) if counts['queue'] > 0 else "-",
227 str(counts['errors']) if counts['errors'] > 0 else "-",
228 str(counts['no_reply']) if counts['no_reply'] > 0 else "-",
229 str(counts['total'])
230 )
231
232 console.print(table)
233
234 # Summary statistics
235 total_notifications = sum(h['total'] for h in handle_counts.values())
236 avg_per_handle = total_notifications / len(handle_counts)
237
238 console.print(f"\n[bold]Summary:[/bold]")
239 console.print(f" Total notifications: {total_notifications}")
240 console.print(f" Unique handles: {len(handle_counts)}")
241 console.print(f" Average per handle: {avg_per_handle:.1f}")
242
243 # Top user info
244 if sorted_handles:
245 top_handle, top_counts = sorted_handles[0]
246 percentage = (top_counts['total'] / total_notifications) * 100
247 console.print(f" Most active: @{top_handle} ({top_counts['total']} notifications, {percentage:.1f}% of total)")
248
249
250def stats():
251 """Show queue statistics."""
252 stats_data = {
253 'queue': {'count': 0, 'handles': set()},
254 'errors': {'count': 0, 'handles': set()},
255 'no_reply': {'count': 0, 'handles': set()}
256 }
257
258 # Collect stats
259 for directory, key in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]:
260 if not directory.exists():
261 continue
262
263 for filepath in directory.glob("*.json"):
264 if filepath.is_dir():
265 continue
266
267 notif = load_notification(filepath)
268 if notif and isinstance(notif, dict):
269 stats_data[key]['count'] += 1
270 handle = notif.get('author', {}).get('handle', 'unknown')
271 stats_data[key]['handles'].add(handle)
272
273 # Display stats
274 table = Table(title="Queue Statistics")
275 table.add_column("Location", style="cyan")
276 table.add_column("Count", style="yellow")
277 table.add_column("Unique Handles", style="green")
278
279 for key, label in [('queue', 'Active Queue'), ('errors', 'Errors'), ('no_reply', 'No Reply')]:
280 table.add_row(
281 label,
282 str(stats_data[key]['count']),
283 str(len(stats_data[key]['handles']))
284 )
285
286 console.print(table)
287
288 # Show top handles
289 all_handles = {}
290 for location_data in stats_data.values():
291 for handle in location_data['handles']:
292 all_handles[handle] = all_handles.get(handle, 0) + 1
293
294 if all_handles:
295 sorted_handles = sorted(all_handles.items(), key=lambda x: x[1], reverse=True)[:10]
296
297 top_table = Table(title="Top 10 Handles by Notification Count")
298 top_table.add_column("Handle", style="green")
299 top_table.add_column("Count", style="yellow")
300
301 for handle, count in sorted_handles:
302 top_table.add_row(f"@{handle}", str(count))
303
304 console.print("\\n")
305 console.print(top_table)
306
307
308def main():
309 parser = argparse.ArgumentParser(description="Manage Void bot notification queue")
310 subparsers = parser.add_subparsers(dest='command', help='Commands')
311
312 # List command
313 list_parser = subparsers.add_parser('list', help='List notifications in queue')
314 list_parser.add_argument('--handle', help='Filter by handle (partial match)')
315 list_parser.add_argument('--all', action='store_true', help='Include errors and no_reply folders')
316
317 # Delete command
318 delete_parser = subparsers.add_parser('delete', help='Delete notifications from a specific handle')
319 delete_parser.add_argument('handle', help='Handle to delete notifications from')
320 delete_parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without deleting')
321 delete_parser.add_argument('--force', action='store_true', help='Skip confirmation prompt')
322
323 # Stats command
324 stats_parser = subparsers.add_parser('stats', help='Show queue statistics')
325
326 # Count command
327 count_parser = subparsers.add_parser('count', help='Show detailed count by handle')
328
329 args = parser.parse_args()
330
331 if args.command == 'list':
332 list_notifications(args.handle, args.all)
333 elif args.command == 'delete':
334 delete_by_handle(args.handle, args.dry_run, args.force)
335 elif args.command == 'stats':
336 stats()
337 elif args.command == 'count':
338 count_by_handle()
339 else:
340 parser.print_help()
341
342
343if __name__ == "__main__":
344 main()