a digital person for bluesky
at x 17 kB view raw
1#!/usr/bin/env python3 2"""Queue management utilities for Void bot.""" 3import json 4import argparse 5from pathlib import Path 6from rich.console import Console 7from rich.table import Table 8from rich.prompt import Confirm 9 10console = Console() 11 12# Queue directories 13QUEUE_BASE_DIR = Path("queue") 14 15 16def load_notification(filepath: Path) -> dict: 17 """Load a notification from a JSON file.""" 18 try: 19 with open(filepath, 'r') as f: 20 return json.load(f) 21 except Exception as e: 22 console.print(f"[red]Error loading {filepath}: {e}[/red]") 23 return None 24 25 26def list_notifications(handle_filter: str = None, show_all: bool = False, agent_id: str = None): 27 """List all notifications in the queue, optionally filtered by handle and/or agent.""" 28 # Build list of directories to check 29 dirs_to_check = [] 30 31 if agent_id: 32 # Check specific agent's directories 33 agent_queue_dir = QUEUE_BASE_DIR / agent_id 34 if agent_queue_dir.exists(): 35 dirs_to_check.append((agent_queue_dir, "queue", agent_id)) 36 if show_all: 37 dirs_to_check.append((agent_queue_dir / "errors", "errors", agent_id)) 38 dirs_to_check.append((agent_queue_dir / "no_reply", "no_reply", agent_id)) 39 else: 40 # Check all agent directories 41 for agent_dir in QUEUE_BASE_DIR.glob("*"): 42 if agent_dir.is_dir() and agent_dir.name != "errors" and agent_dir.name != "no_reply": 43 # Skip old flat structure directories 44 if "-" in agent_dir.name: # Agent IDs have hyphens 45 agent_id_str = agent_dir.name 46 dirs_to_check.append((agent_dir, "queue", agent_id_str)) 47 if show_all: 48 dirs_to_check.append((agent_dir / "errors", "errors", agent_id_str)) 49 dirs_to_check.append((agent_dir / "no_reply", "no_reply", agent_id_str)) 50 51 # Also check old flat structure for backward compatibility 52 if QUEUE_BASE_DIR.exists(): 53 old_queue_files = list(QUEUE_BASE_DIR.glob("*.json")) 54 if old_queue_files and any(f.name != "processed_notifications.json" for f in old_queue_files): 55 dirs_to_check.append((QUEUE_BASE_DIR, "queue (old)", "legacy")) 56 if show_all: 57 old_errors = QUEUE_BASE_DIR / "errors" 58 old_no_reply = QUEUE_BASE_DIR / "no_reply" 59 if old_errors.exists(): 60 dirs_to_check.append((old_errors, "errors (old)", "legacy")) 61 if old_no_reply.exists(): 62 dirs_to_check.append((old_no_reply, "no_reply (old)", "legacy")) 63 64 all_notifications = [] 65 66 for directory, source, dir_agent_id in dirs_to_check: 67 if not directory.exists(): 68 continue 69 70 for filepath in directory.glob("*.json"): 71 # Skip subdirectories 72 if filepath.is_dir(): 73 continue 74 75 notif = load_notification(filepath) 76 if notif and isinstance(notif, dict): 77 notif['_filepath'] = filepath 78 notif['_source'] = source 79 notif['_agent_id'] = dir_agent_id 80 81 # Apply handle filter if specified 82 if handle_filter: 83 author_handle = notif.get('author', {}).get('handle', '') 84 if handle_filter.lower() not in author_handle.lower(): 85 continue 86 87 all_notifications.append(notif) 88 89 # Sort by indexed_at 90 all_notifications.sort(key=lambda x: x.get('indexed_at', ''), reverse=True) 91 92 # Display results 93 if not all_notifications: 94 if handle_filter: 95 console.print(f"[yellow]No notifications found for handle containing '{handle_filter}'[/yellow]") 96 else: 97 console.print("[yellow]No notifications found in queue[/yellow]") 98 return 99 100 table = Table(title=f"Queue Notifications ({len(all_notifications)} total)") 101 table.add_column("Agent", style="blue", width=15) 102 table.add_column("File", style="cyan", width=20) 103 table.add_column("Source", style="magenta", width=10) 104 table.add_column("Handle", style="green", width=25) 105 table.add_column("Display Name", width=20) 106 table.add_column("Text", width=35) 107 table.add_column("Time", style="dim", width=20) 108 109 for notif in all_notifications: 110 author = notif.get('author', {}) 111 handle = author.get('handle', 'unknown') 112 display_name = author.get('display_name', '') 113 text = notif.get('record', {}).get('text', '')[:40] 114 if len(notif.get('record', {}).get('text', '')) > 40: 115 text += "..." 116 indexed_at = notif.get('indexed_at', '')[:19] # Trim milliseconds 117 filename = notif['_filepath'].name[:20] 118 source = notif['_source'] 119 agent_display = notif['_agent_id'][:12] + "..." if len(notif['_agent_id']) > 15 else notif['_agent_id'] 120 121 table.add_row(agent_display, filename, source, f"@{handle}", display_name, text, indexed_at) 122 123 console.print(table) 124 return all_notifications 125 126 127def delete_by_handle(handle: str, dry_run: bool = False, force: bool = False): 128 """Delete all notifications from a specific handle.""" 129 # Remove @ if present 130 handle = handle.lstrip('@') 131 132 # Find all notifications from this handle 133 console.print(f"\\n[bold]Searching for notifications from @{handle}...[/bold]\\n") 134 135 to_delete = [] 136 # Build list of directories to check 137 dirs_to_check = [] 138 139 # Check all agent directories 140 for agent_dir in QUEUE_BASE_DIR.glob("*"): 141 if agent_dir.is_dir() and "-" in agent_dir.name: 142 dirs_to_check.extend([ 143 (agent_dir, "queue"), 144 (agent_dir / "errors", "errors"), 145 (agent_dir / "no_reply", "no_reply") 146 ]) 147 # Also check old flat structure 148 dirs_to_check.extend([ 149 (QUEUE_BASE_DIR, "queue (old)"), 150 (QUEUE_BASE_DIR / "errors", "errors (old)"), 151 (QUEUE_BASE_DIR / "no_reply", "no_reply (old)") 152 ]) 153 154 for directory, source_name in dirs_to_check: 155 if not directory.exists(): 156 continue 157 158 for filepath in directory.glob("*.json"): 159 if filepath.is_dir() or filepath.name == "processed_notifications.json": 160 continue 161 162 notif = load_notification(filepath) 163 if notif and isinstance(notif, dict): 164 author_handle = notif.get('author', {}).get('handle', '') 165 if author_handle.lower() == handle.lower(): 166 to_delete.append({ 167 'filepath': filepath, 168 'notif': notif, 169 'source': source_name 170 }) 171 172 if not to_delete: 173 console.print(f"[yellow]No notifications found from @{handle}[/yellow]") 174 return 175 176 # Display what will be deleted 177 table = Table(title=f"Notifications to Delete from @{handle}") 178 table.add_column("File", style="cyan") 179 table.add_column("Location", style="magenta") 180 table.add_column("Text", width=50) 181 table.add_column("Time", style="dim") 182 183 for item in to_delete: 184 notif = item['notif'] 185 text = notif.get('record', {}).get('text', '')[:50] 186 if len(notif.get('record', {}).get('text', '')) > 50: 187 text += "..." 188 indexed_at = notif.get('indexed_at', '')[:19] 189 190 table.add_row( 191 item['filepath'].name, 192 item['source'], 193 text, 194 indexed_at 195 ) 196 197 console.print(table) 198 console.print(f"\\n[bold red]Found {len(to_delete)} notifications to delete[/bold red]") 199 200 if dry_run: 201 console.print("\\n[yellow]DRY RUN - No files were deleted[/yellow]") 202 return 203 204 # Confirm deletion 205 if not force and not Confirm.ask("\\nDo you want to delete these notifications?"): 206 console.print("[yellow]Deletion cancelled[/yellow]") 207 return 208 209 # Delete the files 210 deleted_count = 0 211 for item in to_delete: 212 try: 213 item['filepath'].unlink() 214 deleted_count += 1 215 console.print(f"[green]✓[/green] Deleted {item['filepath'].name}") 216 except Exception as e: 217 console.print(f"[red]✗[/red] Failed to delete {item['filepath'].name}: {e}") 218 219 console.print(f"\\n[bold green]Successfully deleted {deleted_count} notifications[/bold green]") 220 221 222def count_by_handle(): 223 """Show detailed count of notifications by handle.""" 224 handle_counts = {} 225 226 # Build list of directories to check 227 dirs_to_check = [] 228 229 # Check all agent directories 230 for agent_dir in QUEUE_BASE_DIR.glob("*"): 231 if agent_dir.is_dir() and "-" in agent_dir.name: 232 dirs_to_check.extend([ 233 (agent_dir, 'queue'), 234 (agent_dir / "errors", 'errors'), 235 (agent_dir / "no_reply", 'no_reply') 236 ]) 237 # Also check old flat structure 238 dirs_to_check.extend([ 239 (QUEUE_BASE_DIR, 'queue'), 240 (QUEUE_BASE_DIR / "errors", 'errors'), 241 (QUEUE_BASE_DIR / "no_reply", 'no_reply') 242 ]) 243 244 # Collect counts from all directories 245 for directory, location in dirs_to_check: 246 if not directory.exists(): 247 continue 248 249 for filepath in directory.glob("*.json"): 250 if filepath.is_dir() or filepath.name == "processed_notifications.json": 251 continue 252 253 notif = load_notification(filepath) 254 if notif and isinstance(notif, dict): 255 handle = notif.get('author', {}).get('handle', 'unknown') 256 257 if handle not in handle_counts: 258 handle_counts[handle] = {'queue': 0, 'errors': 0, 'no_reply': 0, 'total': 0} 259 260 handle_counts[handle][location] += 1 261 handle_counts[handle]['total'] += 1 262 263 if not handle_counts: 264 console.print("[yellow]No notifications found in any queue[/yellow]") 265 return 266 267 # Sort by total count 268 sorted_handles = sorted(handle_counts.items(), key=lambda x: x[1]['total'], reverse=True) 269 270 # Display results 271 table = Table(title=f"Notification Count by Handle ({len(handle_counts)} unique handles)") 272 table.add_column("Handle", style="green", width=30) 273 table.add_column("Queue", style="cyan", justify="right") 274 table.add_column("Errors", style="red", justify="right") 275 table.add_column("No Reply", style="yellow", justify="right") 276 table.add_column("Total", style="bold magenta", justify="right") 277 278 for handle, counts in sorted_handles: 279 table.add_row( 280 f"@{handle}", 281 str(counts['queue']) if counts['queue'] > 0 else "-", 282 str(counts['errors']) if counts['errors'] > 0 else "-", 283 str(counts['no_reply']) if counts['no_reply'] > 0 else "-", 284 str(counts['total']) 285 ) 286 287 console.print(table) 288 289 # Summary statistics 290 total_notifications = sum(h['total'] for h in handle_counts.values()) 291 avg_per_handle = total_notifications / len(handle_counts) 292 293 console.print(f"\n[bold]Summary:[/bold]") 294 console.print(f" Total notifications: {total_notifications}") 295 console.print(f" Unique handles: {len(handle_counts)}") 296 console.print(f" Average per handle: {avg_per_handle:.1f}") 297 298 # Top user info 299 if sorted_handles: 300 top_handle, top_counts = sorted_handles[0] 301 percentage = (top_counts['total'] / total_notifications) * 100 302 console.print(f" Most active: @{top_handle} ({top_counts['total']} notifications, {percentage:.1f}% of total)") 303 304 305def stats(): 306 """Show queue statistics.""" 307 # Dictionary to store stats per agent 308 agent_stats = {} 309 310 # Check all agent directories 311 for agent_dir in QUEUE_BASE_DIR.glob("*"): 312 if agent_dir.is_dir() and "-" in agent_dir.name: 313 agent_id = agent_dir.name 314 stats_data = { 315 'queue': {'count': 0, 'handles': set()}, 316 'errors': {'count': 0, 'handles': set()}, 317 'no_reply': {'count': 0, 'handles': set()} 318 } 319 320 # Collect stats for this agent 321 for subdir, key in [(agent_dir, 'queue'), (agent_dir / 'errors', 'errors'), (agent_dir / 'no_reply', 'no_reply')]: 322 if not subdir.exists(): 323 continue 324 325 for filepath in subdir.glob("*.json"): 326 if filepath.is_dir(): 327 continue 328 329 notif = load_notification(filepath) 330 if notif and isinstance(notif, dict): 331 stats_data[key]['count'] += 1 332 handle = notif.get('author', {}).get('handle', 'unknown') 333 stats_data[key]['handles'].add(handle) 334 335 agent_stats[agent_id] = stats_data 336 337 # Also check old flat structure 338 if QUEUE_BASE_DIR.exists(): 339 legacy_stats = { 340 'queue': {'count': 0, 'handles': set()}, 341 'errors': {'count': 0, 'handles': set()}, 342 'no_reply': {'count': 0, 'handles': set()} 343 } 344 345 # Check for legacy queue files 346 for filepath in QUEUE_BASE_DIR.glob("*.json"): 347 if filepath.name != "processed_notifications.json": 348 notif = load_notification(filepath) 349 if notif and isinstance(notif, dict): 350 legacy_stats['queue']['count'] += 1 351 handle = notif.get('author', {}).get('handle', 'unknown') 352 legacy_stats['queue']['handles'].add(handle) 353 354 # Check legacy subdirectories 355 for subdir, key in [(QUEUE_BASE_DIR / 'errors', 'errors'), (QUEUE_BASE_DIR / 'no_reply', 'no_reply')]: 356 if subdir.exists(): 357 for filepath in subdir.glob("*.json"): 358 notif = load_notification(filepath) 359 if notif and isinstance(notif, dict): 360 legacy_stats[key]['count'] += 1 361 handle = notif.get('author', {}).get('handle', 'unknown') 362 legacy_stats[key]['handles'].add(handle) 363 364 if any(legacy_stats[k]['count'] > 0 for k in legacy_stats): 365 agent_stats['legacy'] = legacy_stats 366 367 # Display stats for each agent 368 for agent_id, stats_data in agent_stats.items(): 369 agent_name = agent_id[:12] + "..." if len(agent_id) > 15 else agent_id 370 table = Table(title=f"Queue Statistics - {agent_name}") 371 table.add_column("Location", style="cyan") 372 table.add_column("Count", style="yellow") 373 table.add_column("Unique Handles", style="green") 374 375 for key, label in [('queue', 'Active Queue'), ('errors', 'Errors'), ('no_reply', 'No Reply')]: 376 table.add_row( 377 label, 378 str(stats_data[key]['count']), 379 str(len(stats_data[key]['handles'])) 380 ) 381 382 console.print(table) 383 console.print() # Add spacing between agents 384 385 386def main(): 387 parser = argparse.ArgumentParser(description="Manage Void bot notification queue") 388 subparsers = parser.add_subparsers(dest='command', help='Commands') 389 390 # List command 391 list_parser = subparsers.add_parser('list', help='List notifications in queue') 392 list_parser.add_argument('--handle', help='Filter by handle (partial match)') 393 list_parser.add_argument('--all', action='store_true', help='Include errors and no_reply folders') 394 list_parser.add_argument('--agent-id', help='Filter by specific agent ID') 395 396 # Delete command 397 delete_parser = subparsers.add_parser('delete', help='Delete notifications from a specific handle') 398 delete_parser.add_argument('handle', help='Handle to delete notifications from') 399 delete_parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without deleting') 400 delete_parser.add_argument('--force', action='store_true', help='Skip confirmation prompt') 401 delete_parser.add_argument('--agent-id', help='Only delete from specific agent ID') 402 403 # Stats command 404 stats_parser = subparsers.add_parser('stats', help='Show queue statistics') 405 406 # Count command 407 count_parser = subparsers.add_parser('count', help='Show detailed count by handle') 408 count_parser.add_argument('--agent-id', help='Count for specific agent ID only') 409 410 args = parser.parse_args() 411 412 if args.command == 'list': 413 list_notifications(args.handle, args.all, getattr(args, 'agent_id', None)) 414 elif args.command == 'delete': 415 delete_by_handle(args.handle, args.dry_run, args.force, getattr(args, 'agent_id', None)) 416 elif args.command == 'stats': 417 stats() 418 elif args.command == 'count': 419 count_by_handle(getattr(args, 'agent_id', None)) 420 else: 421 parser.print_help() 422 423 424if __name__ == "__main__": 425 main()