a digital person for bluesky
1#!/usr/bin/env python3
2"""Queue management utilities for Void bot."""
3import json
4import argparse
5from pathlib import Path
6from rich.console import Console
7from rich.table import Table
8from rich.prompt import Confirm
9
10console = Console()
11
12# Queue directories
13QUEUE_BASE_DIR = Path("queue")
14
15
16def load_notification(filepath: Path) -> dict:
17 """Load a notification from a JSON file."""
18 try:
19 with open(filepath, 'r') as f:
20 return json.load(f)
21 except Exception as e:
22 console.print(f"[red]Error loading {filepath}: {e}[/red]")
23 return None
24
25
26def list_notifications(handle_filter: str = None, show_all: bool = False, agent_id: str = None):
27 """List all notifications in the queue, optionally filtered by handle and/or agent."""
28 # Build list of directories to check
29 dirs_to_check = []
30
31 if agent_id:
32 # Check specific agent's directories
33 agent_queue_dir = QUEUE_BASE_DIR / agent_id
34 if agent_queue_dir.exists():
35 dirs_to_check.append((agent_queue_dir, "queue", agent_id))
36 if show_all:
37 dirs_to_check.append((agent_queue_dir / "errors", "errors", agent_id))
38 dirs_to_check.append((agent_queue_dir / "no_reply", "no_reply", agent_id))
39 else:
40 # Check all agent directories
41 for agent_dir in QUEUE_BASE_DIR.glob("*"):
42 if agent_dir.is_dir() and agent_dir.name != "errors" and agent_dir.name != "no_reply":
43 # Skip old flat structure directories
44 if "-" in agent_dir.name: # Agent IDs have hyphens
45 agent_id_str = agent_dir.name
46 dirs_to_check.append((agent_dir, "queue", agent_id_str))
47 if show_all:
48 dirs_to_check.append((agent_dir / "errors", "errors", agent_id_str))
49 dirs_to_check.append((agent_dir / "no_reply", "no_reply", agent_id_str))
50
51 # Also check old flat structure for backward compatibility
52 if QUEUE_BASE_DIR.exists():
53 old_queue_files = list(QUEUE_BASE_DIR.glob("*.json"))
54 if old_queue_files and any(f.name != "processed_notifications.json" for f in old_queue_files):
55 dirs_to_check.append((QUEUE_BASE_DIR, "queue (old)", "legacy"))
56 if show_all:
57 old_errors = QUEUE_BASE_DIR / "errors"
58 old_no_reply = QUEUE_BASE_DIR / "no_reply"
59 if old_errors.exists():
60 dirs_to_check.append((old_errors, "errors (old)", "legacy"))
61 if old_no_reply.exists():
62 dirs_to_check.append((old_no_reply, "no_reply (old)", "legacy"))
63
64 all_notifications = []
65
66 for directory, source, dir_agent_id in dirs_to_check:
67 if not directory.exists():
68 continue
69
70 for filepath in directory.glob("*.json"):
71 # Skip subdirectories
72 if filepath.is_dir():
73 continue
74
75 notif = load_notification(filepath)
76 if notif and isinstance(notif, dict):
77 notif['_filepath'] = filepath
78 notif['_source'] = source
79 notif['_agent_id'] = dir_agent_id
80
81 # Apply handle filter if specified
82 if handle_filter:
83 author_handle = notif.get('author', {}).get('handle', '')
84 if handle_filter.lower() not in author_handle.lower():
85 continue
86
87 all_notifications.append(notif)
88
89 # Sort by indexed_at
90 all_notifications.sort(key=lambda x: x.get('indexed_at', ''), reverse=True)
91
92 # Display results
93 if not all_notifications:
94 if handle_filter:
95 console.print(f"[yellow]No notifications found for handle containing '{handle_filter}'[/yellow]")
96 else:
97 console.print("[yellow]No notifications found in queue[/yellow]")
98 return
99
100 table = Table(title=f"Queue Notifications ({len(all_notifications)} total)")
101 table.add_column("Agent", style="blue", width=15)
102 table.add_column("File", style="cyan", width=20)
103 table.add_column("Source", style="magenta", width=10)
104 table.add_column("Handle", style="green", width=25)
105 table.add_column("Display Name", width=20)
106 table.add_column("Text", width=35)
107 table.add_column("Time", style="dim", width=20)
108
109 for notif in all_notifications:
110 author = notif.get('author', {})
111 handle = author.get('handle', 'unknown')
112 display_name = author.get('display_name', '')
113 text = notif.get('record', {}).get('text', '')[:40]
114 if len(notif.get('record', {}).get('text', '')) > 40:
115 text += "..."
116 indexed_at = notif.get('indexed_at', '')[:19] # Trim milliseconds
117 filename = notif['_filepath'].name[:20]
118 source = notif['_source']
119 agent_display = notif['_agent_id'][:12] + "..." if len(notif['_agent_id']) > 15 else notif['_agent_id']
120
121 table.add_row(agent_display, filename, source, f"@{handle}", display_name, text, indexed_at)
122
123 console.print(table)
124 return all_notifications
125
126
127def delete_by_handle(handle: str, dry_run: bool = False, force: bool = False):
128 """Delete all notifications from a specific handle."""
129 # Remove @ if present
130 handle = handle.lstrip('@')
131
132 # Find all notifications from this handle
133 console.print(f"\\n[bold]Searching for notifications from @{handle}...[/bold]\\n")
134
135 to_delete = []
136 # Build list of directories to check
137 dirs_to_check = []
138
139 # Check all agent directories
140 for agent_dir in QUEUE_BASE_DIR.glob("*"):
141 if agent_dir.is_dir() and "-" in agent_dir.name:
142 dirs_to_check.extend([
143 (agent_dir, "queue"),
144 (agent_dir / "errors", "errors"),
145 (agent_dir / "no_reply", "no_reply")
146 ])
147 # Also check old flat structure
148 dirs_to_check.extend([
149 (QUEUE_BASE_DIR, "queue (old)"),
150 (QUEUE_BASE_DIR / "errors", "errors (old)"),
151 (QUEUE_BASE_DIR / "no_reply", "no_reply (old)")
152 ])
153
154 for directory, source_name in dirs_to_check:
155 if not directory.exists():
156 continue
157
158 for filepath in directory.glob("*.json"):
159 if filepath.is_dir() or filepath.name == "processed_notifications.json":
160 continue
161
162 notif = load_notification(filepath)
163 if notif and isinstance(notif, dict):
164 author_handle = notif.get('author', {}).get('handle', '')
165 if author_handle.lower() == handle.lower():
166 to_delete.append({
167 'filepath': filepath,
168 'notif': notif,
169 'source': source_name
170 })
171
172 if not to_delete:
173 console.print(f"[yellow]No notifications found from @{handle}[/yellow]")
174 return
175
176 # Display what will be deleted
177 table = Table(title=f"Notifications to Delete from @{handle}")
178 table.add_column("File", style="cyan")
179 table.add_column("Location", style="magenta")
180 table.add_column("Text", width=50)
181 table.add_column("Time", style="dim")
182
183 for item in to_delete:
184 notif = item['notif']
185 text = notif.get('record', {}).get('text', '')[:50]
186 if len(notif.get('record', {}).get('text', '')) > 50:
187 text += "..."
188 indexed_at = notif.get('indexed_at', '')[:19]
189
190 table.add_row(
191 item['filepath'].name,
192 item['source'],
193 text,
194 indexed_at
195 )
196
197 console.print(table)
198 console.print(f"\\n[bold red]Found {len(to_delete)} notifications to delete[/bold red]")
199
200 if dry_run:
201 console.print("\\n[yellow]DRY RUN - No files were deleted[/yellow]")
202 return
203
204 # Confirm deletion
205 if not force and not Confirm.ask("\\nDo you want to delete these notifications?"):
206 console.print("[yellow]Deletion cancelled[/yellow]")
207 return
208
209 # Delete the files
210 deleted_count = 0
211 for item in to_delete:
212 try:
213 item['filepath'].unlink()
214 deleted_count += 1
215 console.print(f"[green]✓[/green] Deleted {item['filepath'].name}")
216 except Exception as e:
217 console.print(f"[red]✗[/red] Failed to delete {item['filepath'].name}: {e}")
218
219 console.print(f"\\n[bold green]Successfully deleted {deleted_count} notifications[/bold green]")
220
221
222def count_by_handle():
223 """Show detailed count of notifications by handle."""
224 handle_counts = {}
225
226 # Build list of directories to check
227 dirs_to_check = []
228
229 # Check all agent directories
230 for agent_dir in QUEUE_BASE_DIR.glob("*"):
231 if agent_dir.is_dir() and "-" in agent_dir.name:
232 dirs_to_check.extend([
233 (agent_dir, 'queue'),
234 (agent_dir / "errors", 'errors'),
235 (agent_dir / "no_reply", 'no_reply')
236 ])
237 # Also check old flat structure
238 dirs_to_check.extend([
239 (QUEUE_BASE_DIR, 'queue'),
240 (QUEUE_BASE_DIR / "errors", 'errors'),
241 (QUEUE_BASE_DIR / "no_reply", 'no_reply')
242 ])
243
244 # Collect counts from all directories
245 for directory, location in dirs_to_check:
246 if not directory.exists():
247 continue
248
249 for filepath in directory.glob("*.json"):
250 if filepath.is_dir() or filepath.name == "processed_notifications.json":
251 continue
252
253 notif = load_notification(filepath)
254 if notif and isinstance(notif, dict):
255 handle = notif.get('author', {}).get('handle', 'unknown')
256
257 if handle not in handle_counts:
258 handle_counts[handle] = {'queue': 0, 'errors': 0, 'no_reply': 0, 'total': 0}
259
260 handle_counts[handle][location] += 1
261 handle_counts[handle]['total'] += 1
262
263 if not handle_counts:
264 console.print("[yellow]No notifications found in any queue[/yellow]")
265 return
266
267 # Sort by total count
268 sorted_handles = sorted(handle_counts.items(), key=lambda x: x[1]['total'], reverse=True)
269
270 # Display results
271 table = Table(title=f"Notification Count by Handle ({len(handle_counts)} unique handles)")
272 table.add_column("Handle", style="green", width=30)
273 table.add_column("Queue", style="cyan", justify="right")
274 table.add_column("Errors", style="red", justify="right")
275 table.add_column("No Reply", style="yellow", justify="right")
276 table.add_column("Total", style="bold magenta", justify="right")
277
278 for handle, counts in sorted_handles:
279 table.add_row(
280 f"@{handle}",
281 str(counts['queue']) if counts['queue'] > 0 else "-",
282 str(counts['errors']) if counts['errors'] > 0 else "-",
283 str(counts['no_reply']) if counts['no_reply'] > 0 else "-",
284 str(counts['total'])
285 )
286
287 console.print(table)
288
289 # Summary statistics
290 total_notifications = sum(h['total'] for h in handle_counts.values())
291 avg_per_handle = total_notifications / len(handle_counts)
292
293 console.print(f"\n[bold]Summary:[/bold]")
294 console.print(f" Total notifications: {total_notifications}")
295 console.print(f" Unique handles: {len(handle_counts)}")
296 console.print(f" Average per handle: {avg_per_handle:.1f}")
297
298 # Top user info
299 if sorted_handles:
300 top_handle, top_counts = sorted_handles[0]
301 percentage = (top_counts['total'] / total_notifications) * 100
302 console.print(f" Most active: @{top_handle} ({top_counts['total']} notifications, {percentage:.1f}% of total)")
303
304
305def stats():
306 """Show queue statistics."""
307 # Dictionary to store stats per agent
308 agent_stats = {}
309
310 # Check all agent directories
311 for agent_dir in QUEUE_BASE_DIR.glob("*"):
312 if agent_dir.is_dir() and "-" in agent_dir.name:
313 agent_id = agent_dir.name
314 stats_data = {
315 'queue': {'count': 0, 'handles': set()},
316 'errors': {'count': 0, 'handles': set()},
317 'no_reply': {'count': 0, 'handles': set()}
318 }
319
320 # Collect stats for this agent
321 for subdir, key in [(agent_dir, 'queue'), (agent_dir / 'errors', 'errors'), (agent_dir / 'no_reply', 'no_reply')]:
322 if not subdir.exists():
323 continue
324
325 for filepath in subdir.glob("*.json"):
326 if filepath.is_dir():
327 continue
328
329 notif = load_notification(filepath)
330 if notif and isinstance(notif, dict):
331 stats_data[key]['count'] += 1
332 handle = notif.get('author', {}).get('handle', 'unknown')
333 stats_data[key]['handles'].add(handle)
334
335 agent_stats[agent_id] = stats_data
336
337 # Also check old flat structure
338 if QUEUE_BASE_DIR.exists():
339 legacy_stats = {
340 'queue': {'count': 0, 'handles': set()},
341 'errors': {'count': 0, 'handles': set()},
342 'no_reply': {'count': 0, 'handles': set()}
343 }
344
345 # Check for legacy queue files
346 for filepath in QUEUE_BASE_DIR.glob("*.json"):
347 if filepath.name != "processed_notifications.json":
348 notif = load_notification(filepath)
349 if notif and isinstance(notif, dict):
350 legacy_stats['queue']['count'] += 1
351 handle = notif.get('author', {}).get('handle', 'unknown')
352 legacy_stats['queue']['handles'].add(handle)
353
354 # Check legacy subdirectories
355 for subdir, key in [(QUEUE_BASE_DIR / 'errors', 'errors'), (QUEUE_BASE_DIR / 'no_reply', 'no_reply')]:
356 if subdir.exists():
357 for filepath in subdir.glob("*.json"):
358 notif = load_notification(filepath)
359 if notif and isinstance(notif, dict):
360 legacy_stats[key]['count'] += 1
361 handle = notif.get('author', {}).get('handle', 'unknown')
362 legacy_stats[key]['handles'].add(handle)
363
364 if any(legacy_stats[k]['count'] > 0 for k in legacy_stats):
365 agent_stats['legacy'] = legacy_stats
366
367 # Display stats for each agent
368 for agent_id, stats_data in agent_stats.items():
369 agent_name = agent_id[:12] + "..." if len(agent_id) > 15 else agent_id
370 table = Table(title=f"Queue Statistics - {agent_name}")
371 table.add_column("Location", style="cyan")
372 table.add_column("Count", style="yellow")
373 table.add_column("Unique Handles", style="green")
374
375 for key, label in [('queue', 'Active Queue'), ('errors', 'Errors'), ('no_reply', 'No Reply')]:
376 table.add_row(
377 label,
378 str(stats_data[key]['count']),
379 str(len(stats_data[key]['handles']))
380 )
381
382 console.print(table)
383 console.print() # Add spacing between agents
384
385
386def main():
387 parser = argparse.ArgumentParser(description="Manage Void bot notification queue")
388 subparsers = parser.add_subparsers(dest='command', help='Commands')
389
390 # List command
391 list_parser = subparsers.add_parser('list', help='List notifications in queue')
392 list_parser.add_argument('--handle', help='Filter by handle (partial match)')
393 list_parser.add_argument('--all', action='store_true', help='Include errors and no_reply folders')
394 list_parser.add_argument('--agent-id', help='Filter by specific agent ID')
395
396 # Delete command
397 delete_parser = subparsers.add_parser('delete', help='Delete notifications from a specific handle')
398 delete_parser.add_argument('handle', help='Handle to delete notifications from')
399 delete_parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without deleting')
400 delete_parser.add_argument('--force', action='store_true', help='Skip confirmation prompt')
401 delete_parser.add_argument('--agent-id', help='Only delete from specific agent ID')
402
403 # Stats command
404 stats_parser = subparsers.add_parser('stats', help='Show queue statistics')
405
406 # Count command
407 count_parser = subparsers.add_parser('count', help='Show detailed count by handle')
408 count_parser.add_argument('--agent-id', help='Count for specific agent ID only')
409
410 args = parser.parse_args()
411
412 if args.command == 'list':
413 list_notifications(args.handle, args.all, getattr(args, 'agent_id', None))
414 elif args.command == 'delete':
415 delete_by_handle(args.handle, args.dry_run, args.force, getattr(args, 'agent_id', None))
416 elif args.command == 'stats':
417 stats()
418 elif args.command == 'count':
419 count_by_handle(getattr(args, 'agent_id', None))
420 else:
421 parser.print_help()
422
423
424if __name__ == "__main__":
425 main()