a digital person for bluesky

Add queue management utility for notification cleanup

Implements a comprehensive queue management tool that allows:
- Viewing queue statistics across all directories (active, errors, no_reply)
- Listing notifications with filtering by handle
- Deleting all notifications from specific handles with dry-run support
- Force deletion option to skip confirmation prompts

Key features:
- Rich console output with formatted tables
- Safety features: dry-run mode and confirmation prompts
- Handles all queue directories (queue/, queue/errors/, queue/no_reply/)
- Proper error handling and validation
- Updated CLAUDE.md with usage examples

This tool helps manage notification queue buildup from bots and spam accounts,
complementing the new ignore_notification tool for proactive filtering.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+265 -236
+24
CLAUDE.md
··· 41 41 ac && python attach_user_block.py 42 42 ``` 43 43 44 + ### Queue Management 45 + ```bash 46 + # View queue statistics 47 + python queue_manager.py stats 48 + 49 + # List all notifications in queue 50 + python queue_manager.py list 51 + 52 + # List notifications including errors and no_reply folders 53 + python queue_manager.py list --all 54 + 55 + # Filter notifications by handle 56 + python queue_manager.py list --handle "example.bsky.social" 57 + 58 + # Delete all notifications from a specific handle (dry run) 59 + python queue_manager.py delete @example.bsky.social --dry-run 60 + 61 + # Delete all notifications from a specific handle (actual deletion) 62 + python queue_manager.py delete @example.bsky.social 63 + 64 + # Delete all notifications from a specific handle (skip confirmation) 65 + python queue_manager.py delete @example.bsky.social --force 66 + ``` 67 + 44 68 ## Architecture Overview 45 69 46 70 ### Core Components
+241 -236
queue_manager.py
··· 1 1 #!/usr/bin/env python3 2 - 2 + """Queue management utilities for Void bot.""" 3 3 import json 4 - import os 5 - import shutil 4 + import argparse 6 5 from pathlib import Path 7 - from typing import List, Dict, Any 8 6 from rich.console import Console 9 7 from rich.table import Table 10 - from rich.panel import Panel 11 - from rich.text import Text 8 + from rich.prompt import Confirm 12 9 13 - class QueueManager: 14 - def __init__(self, queue_dir: str = "queue"): 15 - self.queue_dir = Path(queue_dir) 16 - self.deleted_dir = self.queue_dir / "deleted" 17 - self.processed_file = self.queue_dir / "processed_notifications.json" 18 - self.console = Console() 10 + console = Console() 11 + 12 + # Queue directories 13 + QUEUE_DIR = Path("queue") 14 + QUEUE_ERROR_DIR = QUEUE_DIR / "errors" 15 + QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply" 16 + 17 + 18 + def load_notification(filepath: Path) -> dict: 19 + """Load a notification from a JSON file.""" 20 + try: 21 + with open(filepath, 'r') as f: 22 + return json.load(f) 23 + except Exception as e: 24 + console.print(f"[red]Error loading {filepath}: {e}[/red]") 25 + return None 26 + 27 + 28 + def list_notifications(handle_filter: str = None, show_all: bool = False): 29 + """List all notifications in the queue, optionally filtered by handle.""" 30 + # Collect notifications from all directories if show_all is True 31 + if show_all: 32 + dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR] 33 + else: 34 + dirs_to_check = [QUEUE_DIR] 35 + 36 + all_notifications = [] 37 + 38 + for directory in dirs_to_check: 39 + if not directory.exists(): 40 + continue 41 + 42 + # Get source directory name for display 43 + if directory == QUEUE_DIR: 44 + source = "queue" 45 + elif directory == QUEUE_ERROR_DIR: 46 + source = "errors" 47 + elif directory == QUEUE_NO_REPLY_DIR: 48 + source = "no_reply" 49 + else: 50 + source = "unknown" 19 51 20 - # Create deleted directory if it doesn't exist 21 - self.deleted_dir.mkdir(exist_ok=True) 52 + for filepath in directory.glob("*.json"): 53 + # Skip subdirectories 54 + if filepath.is_dir(): 55 + continue 56 + 57 + notif = load_notification(filepath) 58 + if notif and isinstance(notif, dict): 59 + notif['_filepath'] = filepath 60 + notif['_source'] = source 61 + 62 + # Apply handle filter if specified 63 + if handle_filter: 64 + author_handle = notif.get('author', {}).get('handle', '') 65 + if handle_filter.lower() not in author_handle.lower(): 66 + continue 67 + 68 + all_notifications.append(notif) 69 + 70 + # Sort by indexed_at 71 + all_notifications.sort(key=lambda x: x.get('indexed_at', ''), reverse=True) 72 + 73 + # Display results 74 + if not all_notifications: 75 + if handle_filter: 76 + console.print(f"[yellow]No notifications found for handle containing '{handle_filter}'[/yellow]") 77 + else: 78 + console.print("[yellow]No notifications found in queue[/yellow]") 79 + return 80 + 81 + table = Table(title=f"Queue Notifications ({len(all_notifications)} total)") 82 + table.add_column("File", style="cyan", width=20) 83 + table.add_column("Source", style="magenta", width=10) 84 + table.add_column("Handle", style="green", width=25) 85 + table.add_column("Display Name", width=25) 86 + table.add_column("Text", width=40) 87 + table.add_column("Time", style="dim", width=20) 88 + 89 + for notif in all_notifications: 90 + author = notif.get('author', {}) 91 + handle = author.get('handle', 'unknown') 92 + display_name = author.get('display_name', '') 93 + text = notif.get('record', {}).get('text', '')[:40] 94 + if len(notif.get('record', {}).get('text', '')) > 40: 95 + text += "..." 96 + indexed_at = notif.get('indexed_at', '')[:19] # Trim milliseconds 97 + filename = notif['_filepath'].name[:20] 98 + source = notif['_source'] 99 + 100 + table.add_row(filename, source, f"@{handle}", display_name, text, indexed_at) 101 + 102 + console.print(table) 103 + return all_notifications 104 + 105 + 106 + def delete_by_handle(handle: str, dry_run: bool = False, force: bool = False): 107 + """Delete all notifications from a specific handle.""" 108 + # Remove @ if present 109 + handle = handle.lstrip('@') 110 + 111 + # Find all notifications from this handle 112 + console.print(f"\\n[bold]Searching for notifications from @{handle}...[/bold]\\n") 113 + 114 + to_delete = [] 115 + dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR] 116 + 117 + for directory in dirs_to_check: 118 + if not directory.exists(): 119 + continue 120 + 121 + for filepath in directory.glob("*.json"): 122 + if filepath.is_dir(): 123 + continue 124 + 125 + notif = load_notification(filepath) 126 + if notif and isinstance(notif, dict): 127 + author_handle = notif.get('author', {}).get('handle', '') 128 + if author_handle.lower() == handle.lower(): 129 + to_delete.append({ 130 + 'filepath': filepath, 131 + 'notif': notif, 132 + 'source': directory.name 133 + }) 134 + 135 + if not to_delete: 136 + console.print(f"[yellow]No notifications found from @{handle}[/yellow]") 137 + return 138 + 139 + # Display what will be deleted 140 + table = Table(title=f"Notifications to Delete from @{handle}") 141 + table.add_column("File", style="cyan") 142 + table.add_column("Location", style="magenta") 143 + table.add_column("Text", width=50) 144 + table.add_column("Time", style="dim") 145 + 146 + for item in to_delete: 147 + notif = item['notif'] 148 + text = notif.get('record', {}).get('text', '')[:50] 149 + if len(notif.get('record', {}).get('text', '')) > 50: 150 + text += "..." 151 + indexed_at = notif.get('indexed_at', '')[:19] 22 152 23 - # Load existing processed notifications 24 - self.processed_notifications = self._load_processed_notifications() 153 + table.add_row( 154 + item['filepath'].name, 155 + item['source'], 156 + text, 157 + indexed_at 158 + ) 25 159 26 - def _load_processed_notifications(self) -> List[str]: 27 - """Load the list of processed notification URIs.""" 28 - if self.processed_file.exists(): 29 - try: 30 - with open(self.processed_file, 'r') as f: 31 - return json.load(f) 32 - except (json.JSONDecodeError, FileNotFoundError): 33 - return [] 34 - return [] 160 + console.print(table) 161 + console.print(f"\\n[bold red]Found {len(to_delete)} notifications to delete[/bold red]") 35 162 36 - def _save_processed_notifications(self): 37 - """Save the list of processed notification URIs.""" 38 - with open(self.processed_file, 'w') as f: 39 - json.dump(self.processed_notifications, f, indent=2) 163 + if dry_run: 164 + console.print("\\n[yellow]DRY RUN - No files were deleted[/yellow]") 165 + return 40 166 41 - def _get_queue_files(self) -> List[Path]: 42 - """Get all JSON files in the queue directory, excluding deleted/.""" 43 - return [f for f in self.queue_dir.glob("*.json") if f.name != "processed_notifications.json"] 167 + # Confirm deletion 168 + if not force and not Confirm.ask("\\nDo you want to delete these notifications?"): 169 + console.print("[yellow]Deletion cancelled[/yellow]") 170 + return 44 171 45 - def _load_notification(self, file_path: Path) -> Dict[str, Any]: 46 - """Load a notification from a JSON file.""" 172 + # Delete the files 173 + deleted_count = 0 174 + for item in to_delete: 47 175 try: 48 - with open(file_path, 'r') as f: 49 - return json.load(f) 50 - except (json.JSONDecodeError, FileNotFoundError) as e: 51 - return {"error": f"Failed to load {file_path}: {e}"} 176 + item['filepath'].unlink() 177 + deleted_count += 1 178 + console.print(f"[green]✓[/green] Deleted {item['filepath'].name}") 179 + except Exception as e: 180 + console.print(f"[red]✗[/red] Failed to delete {item['filepath'].name}: {e}") 52 181 53 - def _format_notification_summary(self, notification: Dict[str, Any], file_path: Path) -> str: 54 - """Create a short summary of the notification for display.""" 55 - if "error" in notification: 56 - return f"[red]ERROR: {notification['error']}[/red]" 57 - 58 - reason = notification.get("reason", "unknown") 59 - author = notification.get("author", {}) 60 - handle = author.get("handle", "unknown") 61 - display_name = author.get("display_name", "") 62 - record = notification.get("record", {}) 63 - text = record.get("text", "") 64 - 65 - # Truncate text if too long 66 - if len(text) > 100: 67 - text = text[:97] + "..." 68 - 69 - summary = f"[cyan]{reason}[/cyan] from [green]{handle}[/green]" 70 - if display_name: 71 - summary += f" ([yellow]{display_name}[/yellow])" 72 - 73 - if text: 74 - summary += f"\n [dim]{text}[/dim]" 75 - 76 - summary += f"\n [magenta]{file_path.name}[/magenta]" 77 - 78 - return summary 182 + console.print(f"\\n[bold green]Successfully deleted {deleted_count} notifications[/bold green]") 183 + 184 + 185 + def stats(): 186 + """Show queue statistics.""" 187 + stats_data = { 188 + 'queue': {'count': 0, 'handles': set()}, 189 + 'errors': {'count': 0, 'handles': set()}, 190 + 'no_reply': {'count': 0, 'handles': set()} 191 + } 79 192 80 - def browse_queue(self, page_size: int = 10): 81 - """Interactive queue browser with paging and deletion.""" 82 - files = self._get_queue_files() 83 - if not files: 84 - self.console.print("[yellow]No files in queue.[/yellow]") 85 - return 86 - 87 - # Sort files by modification time (newest first) 88 - files.sort(key=lambda f: f.stat().st_mtime, reverse=True) 89 - 90 - current_page = 0 91 - total_pages = (len(files) + page_size - 1) // page_size 92 - marked_for_deletion = set() 93 - 94 - while True: 95 - # Clear screen 96 - self.console.clear() 97 - 98 - # Calculate current page bounds 99 - start_idx = current_page * page_size 100 - end_idx = min(start_idx + page_size, len(files)) 101 - current_files = files[start_idx:end_idx] 102 - 103 - # Create table 104 - table = Table(title=f"Queue Browser - Page {current_page + 1}/{total_pages}") 105 - table.add_column("Index", justify="center", style="cyan") 106 - table.add_column("Status", justify="center", style="magenta") 107 - table.add_column("Notification", style="white") 193 + # Collect stats 194 + for directory, key in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]: 195 + if not directory.exists(): 196 + continue 108 197 109 - # Add rows for current page 110 - for i, file_path in enumerate(current_files): 111 - global_index = start_idx + i 112 - notification = self._load_notification(file_path) 113 - summary = self._format_notification_summary(notification, file_path) 198 + for filepath in directory.glob("*.json"): 199 + if filepath.is_dir(): 200 + continue 114 201 115 - status = "[red]DELETE[/red]" if file_path in marked_for_deletion else "[green]KEEP[/green]" 116 - 117 - table.add_row(str(global_index), status, summary) 118 - 119 - self.console.print(table) 120 - 121 - # Show statistics 122 - stats_text = f"Total files: {len(files)} | Marked for deletion: {len(marked_for_deletion)}" 123 - self.console.print(Panel(stats_text, title="Statistics")) 124 - 125 - # Show help 126 - help_text = """ 127 - Commands: 128 - [cyan]n[/cyan] - Next page [cyan]p[/cyan] - Previous page [cyan]q[/cyan] - Quit 129 - [cyan]d <idx>[/cyan] - Toggle delete flag for item at index 130 - [cyan]v <idx>[/cyan] - View full notification at index 131 - [cyan]execute[/cyan] - Execute deletions and quit 132 - [cyan]clear[/cyan] - Clear all delete flags 133 - """ 134 - self.console.print(Panel(help_text.strip(), title="Help")) 135 - 136 - # Get user input 137 - try: 138 - command = input("\nEnter command: ").strip().lower() 139 - 140 - if command == 'q': 141 - break 142 - elif command == 'n': 143 - if current_page < total_pages - 1: 144 - current_page += 1 145 - else: 146 - self.console.print("[yellow]Already on last page.[/yellow]") 147 - input("Press Enter to continue...") 148 - elif command == 'p': 149 - if current_page > 0: 150 - current_page -= 1 151 - else: 152 - self.console.print("[yellow]Already on first page.[/yellow]") 153 - input("Press Enter to continue...") 154 - elif command.startswith('d '): 155 - try: 156 - idx = int(command.split()[1]) 157 - if 0 <= idx < len(files): 158 - file_path = files[idx] 159 - if file_path in marked_for_deletion: 160 - marked_for_deletion.remove(file_path) 161 - else: 162 - marked_for_deletion.add(file_path) 163 - else: 164 - self.console.print(f"[red]Invalid index: {idx}[/red]") 165 - input("Press Enter to continue...") 166 - except (ValueError, IndexError): 167 - self.console.print("[red]Invalid command format. Use: d <index>[/red]") 168 - input("Press Enter to continue...") 169 - elif command.startswith('v '): 170 - try: 171 - idx = int(command.split()[1]) 172 - if 0 <= idx < len(files): 173 - self._view_notification(files[idx]) 174 - else: 175 - self.console.print(f"[red]Invalid index: {idx}[/red]") 176 - input("Press Enter to continue...") 177 - except (ValueError, IndexError): 178 - self.console.print("[red]Invalid command format. Use: v <index>[/red]") 179 - input("Press Enter to continue...") 180 - elif command == 'execute': 181 - if marked_for_deletion: 182 - self._execute_deletions(marked_for_deletion) 183 - else: 184 - self.console.print("[yellow]No files marked for deletion.[/yellow]") 185 - input("Press Enter to continue...") 186 - break 187 - elif command == 'clear': 188 - marked_for_deletion.clear() 189 - self.console.print("[green]All delete flags cleared.[/green]") 190 - input("Press Enter to continue...") 191 - else: 192 - self.console.print("[red]Unknown command.[/red]") 193 - input("Press Enter to continue...") 194 - 195 - except KeyboardInterrupt: 196 - break 202 + notif = load_notification(filepath) 203 + if notif and isinstance(notif, dict): 204 + stats_data[key]['count'] += 1 205 + handle = notif.get('author', {}).get('handle', 'unknown') 206 + stats_data[key]['handles'].add(handle) 197 207 198 - def _view_notification(self, file_path: Path): 199 - """Display full notification content.""" 200 - self.console.clear() 201 - notification = self._load_notification(file_path) 202 - 203 - # Display as formatted JSON 204 - self.console.print(Panel( 205 - json.dumps(notification, indent=2), 206 - title=f"Notification: {file_path.name}", 207 - expand=False 208 - )) 209 - 210 - input("\nPress Enter to continue...") 208 + # Display stats 209 + table = Table(title="Queue Statistics") 210 + table.add_column("Location", style="cyan") 211 + table.add_column("Count", style="yellow") 212 + table.add_column("Unique Handles", style="green") 211 213 212 - def _execute_deletions(self, marked_files: set): 213 - """Move marked files to deleted/ directory and update processed_notifications.json.""" 214 - self.console.print(f"\n[yellow]Moving {len(marked_files)} files to deleted/ directory...[/yellow]") 214 + for key, label in [('queue', 'Active Queue'), ('errors', 'Errors'), ('no_reply', 'No Reply')]: 215 + table.add_row( 216 + label, 217 + str(stats_data[key]['count']), 218 + str(len(stats_data[key]['handles'])) 219 + ) 220 + 221 + console.print(table) 222 + 223 + # Show top handles 224 + all_handles = {} 225 + for location_data in stats_data.values(): 226 + for handle in location_data['handles']: 227 + all_handles[handle] = all_handles.get(handle, 0) + 1 228 + 229 + if all_handles: 230 + sorted_handles = sorted(all_handles.items(), key=lambda x: x[1], reverse=True)[:10] 215 231 216 - moved_count = 0 217 - added_to_processed = [] 232 + top_table = Table(title="Top 10 Handles by Notification Count") 233 + top_table.add_column("Handle", style="green") 234 + top_table.add_column("Count", style="yellow") 218 235 219 - for file_path in marked_files: 220 - try: 221 - # Load notification to get URI 222 - notification = self._load_notification(file_path) 223 - if "uri" in notification: 224 - uri = notification["uri"] 225 - if uri not in self.processed_notifications: 226 - self.processed_notifications.append(uri) 227 - added_to_processed.append(uri) 228 - 229 - # Move file to deleted directory 230 - deleted_path = self.deleted_dir / file_path.name 231 - shutil.move(str(file_path), str(deleted_path)) 232 - moved_count += 1 233 - 234 - self.console.print(f"[green]✓[/green] Moved {file_path.name}") 235 - 236 - except Exception as e: 237 - self.console.print(f"[red]✗[/red] Failed to move {file_path.name}: {e}") 236 + for handle, count in sorted_handles: 237 + top_table.add_row(f"@{handle}", str(count)) 238 238 239 - # Save updated processed notifications 240 - if added_to_processed: 241 - self._save_processed_notifications() 242 - self.console.print(f"\n[green]Added {len(added_to_processed)} URIs to processed_notifications.json[/green]") 243 - 244 - self.console.print(f"\n[green]Successfully moved {moved_count} files to deleted/ directory.[/green]") 245 - input("Press Enter to continue...") 239 + console.print("\\n") 240 + console.print(top_table) 246 241 247 242 248 243 def main(): 249 - """Main entry point for the queue manager.""" 250 - import argparse 244 + parser = argparse.ArgumentParser(description="Manage Void bot notification queue") 245 + subparsers = parser.add_subparsers(dest='command', help='Commands') 251 246 252 - parser = argparse.ArgumentParser(description="Interactive queue management tool") 253 - parser.add_argument("--queue-dir", default="queue", help="Queue directory path") 254 - parser.add_argument("--page-size", type=int, default=10, help="Number of items per page") 247 + # List command 248 + list_parser = subparsers.add_parser('list', help='List notifications in queue') 249 + list_parser.add_argument('--handle', help='Filter by handle (partial match)') 250 + list_parser.add_argument('--all', action='store_true', help='Include errors and no_reply folders') 255 251 256 - args = parser.parse_args() 252 + # Delete command 253 + delete_parser = subparsers.add_parser('delete', help='Delete notifications from a specific handle') 254 + delete_parser.add_argument('handle', help='Handle to delete notifications from') 255 + delete_parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without deleting') 256 + delete_parser.add_argument('--force', action='store_true', help='Skip confirmation prompt') 257 257 258 - if not os.path.exists(args.queue_dir): 259 - print(f"Error: Queue directory '{args.queue_dir}' does not exist.") 260 - return 1 258 + # Stats command 259 + stats_parser = subparsers.add_parser('stats', help='Show queue statistics') 261 260 262 - manager = QueueManager(args.queue_dir) 263 - manager.browse_queue(args.page_size) 261 + args = parser.parse_args() 264 262 265 - return 0 263 + if args.command == 'list': 264 + list_notifications(args.handle, args.all) 265 + elif args.command == 'delete': 266 + delete_by_handle(args.handle, args.dry_run, args.force) 267 + elif args.command == 'stats': 268 + stats() 269 + else: 270 + parser.print_help() 266 271 267 272 268 273 if __name__ == "__main__": 269 - exit(main()) 274 + main()