a digital person for bluesky
1#!/usr/bin/env python3 2"""Queue management utilities for Void bot.""" 3import json 4import argparse 5from pathlib import Path 6from rich.console import Console 7from rich.table import Table 8from rich.prompt import Confirm 9 10console = Console() 11 12# Queue directories 13QUEUE_DIR = Path("queue") 14QUEUE_ERROR_DIR = QUEUE_DIR / "errors" 15QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply" 16 17 18def load_notification(filepath: Path) -> dict: 19 """Load a notification from a JSON file.""" 20 try: 21 with open(filepath, 'r') as f: 22 return json.load(f) 23 except Exception as e: 24 console.print(f"[red]Error loading {filepath}: {e}[/red]") 25 return None 26 27 28def list_notifications(handle_filter: str = None, show_all: bool = False): 29 """List all notifications in the queue, optionally filtered by handle.""" 30 # Collect notifications from all directories if show_all is True 31 if show_all: 32 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR] 33 else: 34 dirs_to_check = [QUEUE_DIR] 35 36 all_notifications = [] 37 38 for directory in dirs_to_check: 39 if not directory.exists(): 40 continue 41 42 # Get source directory name for display 43 if directory == QUEUE_DIR: 44 source = "queue" 45 elif directory == QUEUE_ERROR_DIR: 46 source = "errors" 47 elif directory == QUEUE_NO_REPLY_DIR: 48 source = "no_reply" 49 else: 50 source = "unknown" 51 52 for filepath in directory.glob("*.json"): 53 # Skip subdirectories 54 if filepath.is_dir(): 55 continue 56 57 notif = load_notification(filepath) 58 if notif and isinstance(notif, dict): 59 notif['_filepath'] = filepath 60 notif['_source'] = source 61 62 # Apply handle filter if specified 63 if handle_filter: 64 author_handle = notif.get('author', {}).get('handle', '') 65 if handle_filter.lower() not in author_handle.lower(): 66 continue 67 68 all_notifications.append(notif) 69 70 # Sort by indexed_at 71 all_notifications.sort(key=lambda x: x.get('indexed_at', ''), reverse=True) 72 73 # Display results 74 if not all_notifications: 75 if handle_filter: 76 console.print(f"[yellow]No notifications found for handle containing '{handle_filter}'[/yellow]") 77 else: 78 console.print("[yellow]No notifications found in queue[/yellow]") 79 return 80 81 table = Table(title=f"Queue Notifications ({len(all_notifications)} total)") 82 table.add_column("File", style="cyan", width=20) 83 table.add_column("Source", style="magenta", width=10) 84 table.add_column("Handle", style="green", width=25) 85 table.add_column("Display Name", width=25) 86 table.add_column("Text", width=40) 87 table.add_column("Time", style="dim", width=20) 88 89 for notif in all_notifications: 90 author = notif.get('author', {}) 91 handle = author.get('handle', 'unknown') 92 display_name = author.get('display_name', '') 93 text = notif.get('record', {}).get('text', '')[:40] 94 if len(notif.get('record', {}).get('text', '')) > 40: 95 text += "..." 96 indexed_at = notif.get('indexed_at', '')[:19] # Trim milliseconds 97 filename = notif['_filepath'].name[:20] 98 source = notif['_source'] 99 100 table.add_row(filename, source, f"@{handle}", display_name, text, indexed_at) 101 102 console.print(table) 103 return all_notifications 104 105 106def delete_by_handle(handle: str, dry_run: bool = False, force: bool = False): 107 """Delete all notifications from a specific handle.""" 108 # Remove @ if present 109 handle = handle.lstrip('@') 110 111 # Find all notifications from this handle 112 console.print(f"\\n[bold]Searching for notifications from @{handle}...[/bold]\\n") 113 114 to_delete = [] 115 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR] 116 117 for directory in dirs_to_check: 118 if not directory.exists(): 119 continue 120 121 for filepath in directory.glob("*.json"): 122 if filepath.is_dir(): 123 continue 124 125 notif = load_notification(filepath) 126 if notif and isinstance(notif, dict): 127 author_handle = notif.get('author', {}).get('handle', '') 128 if author_handle.lower() == handle.lower(): 129 to_delete.append({ 130 'filepath': filepath, 131 'notif': notif, 132 'source': directory.name 133 }) 134 135 if not to_delete: 136 console.print(f"[yellow]No notifications found from @{handle}[/yellow]") 137 return 138 139 # Display what will be deleted 140 table = Table(title=f"Notifications to Delete from @{handle}") 141 table.add_column("File", style="cyan") 142 table.add_column("Location", style="magenta") 143 table.add_column("Text", width=50) 144 table.add_column("Time", style="dim") 145 146 for item in to_delete: 147 notif = item['notif'] 148 text = notif.get('record', {}).get('text', '')[:50] 149 if len(notif.get('record', {}).get('text', '')) > 50: 150 text += "..." 151 indexed_at = notif.get('indexed_at', '')[:19] 152 153 table.add_row( 154 item['filepath'].name, 155 item['source'], 156 text, 157 indexed_at 158 ) 159 160 console.print(table) 161 console.print(f"\\n[bold red]Found {len(to_delete)} notifications to delete[/bold red]") 162 163 if dry_run: 164 console.print("\\n[yellow]DRY RUN - No files were deleted[/yellow]") 165 return 166 167 # Confirm deletion 168 if not force and not Confirm.ask("\\nDo you want to delete these notifications?"): 169 console.print("[yellow]Deletion cancelled[/yellow]") 170 return 171 172 # Delete the files 173 deleted_count = 0 174 for item in to_delete: 175 try: 176 item['filepath'].unlink() 177 deleted_count += 1 178 console.print(f"[green]✓[/green] Deleted {item['filepath'].name}") 179 except Exception as e: 180 console.print(f"[red]✗[/red] Failed to delete {item['filepath'].name}: {e}") 181 182 console.print(f"\\n[bold green]Successfully deleted {deleted_count} notifications[/bold green]") 183 184 185def stats(): 186 """Show queue statistics.""" 187 stats_data = { 188 'queue': {'count': 0, 'handles': set()}, 189 'errors': {'count': 0, 'handles': set()}, 190 'no_reply': {'count': 0, 'handles': set()} 191 } 192 193 # Collect stats 194 for directory, key in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]: 195 if not directory.exists(): 196 continue 197 198 for filepath in directory.glob("*.json"): 199 if filepath.is_dir(): 200 continue 201 202 notif = load_notification(filepath) 203 if notif and isinstance(notif, dict): 204 stats_data[key]['count'] += 1 205 handle = notif.get('author', {}).get('handle', 'unknown') 206 stats_data[key]['handles'].add(handle) 207 208 # Display stats 209 table = Table(title="Queue Statistics") 210 table.add_column("Location", style="cyan") 211 table.add_column("Count", style="yellow") 212 table.add_column("Unique Handles", style="green") 213 214 for key, label in [('queue', 'Active Queue'), ('errors', 'Errors'), ('no_reply', 'No Reply')]: 215 table.add_row( 216 label, 217 str(stats_data[key]['count']), 218 str(len(stats_data[key]['handles'])) 219 ) 220 221 console.print(table) 222 223 # Show top handles 224 all_handles = {} 225 for location_data in stats_data.values(): 226 for handle in location_data['handles']: 227 all_handles[handle] = all_handles.get(handle, 0) + 1 228 229 if all_handles: 230 sorted_handles = sorted(all_handles.items(), key=lambda x: x[1], reverse=True)[:10] 231 232 top_table = Table(title="Top 10 Handles by Notification Count") 233 top_table.add_column("Handle", style="green") 234 top_table.add_column("Count", style="yellow") 235 236 for handle, count in sorted_handles: 237 top_table.add_row(f"@{handle}", str(count)) 238 239 console.print("\\n") 240 console.print(top_table) 241 242 243def main(): 244 parser = argparse.ArgumentParser(description="Manage Void bot notification queue") 245 subparsers = parser.add_subparsers(dest='command', help='Commands') 246 247 # List command 248 list_parser = subparsers.add_parser('list', help='List notifications in queue') 249 list_parser.add_argument('--handle', help='Filter by handle (partial match)') 250 list_parser.add_argument('--all', action='store_true', help='Include errors and no_reply folders') 251 252 # Delete command 253 delete_parser = subparsers.add_parser('delete', help='Delete notifications from a specific handle') 254 delete_parser.add_argument('handle', help='Handle to delete notifications from') 255 delete_parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without deleting') 256 delete_parser.add_argument('--force', action='store_true', help='Skip confirmation prompt') 257 258 # Stats command 259 stats_parser = subparsers.add_parser('stats', help='Show queue statistics') 260 261 args = parser.parse_args() 262 263 if args.command == 'list': 264 list_notifications(args.handle, args.all) 265 elif args.command == 'delete': 266 delete_by_handle(args.handle, args.dry_run, args.force) 267 elif args.command == 'stats': 268 stats() 269 else: 270 parser.print_help() 271 272 273if __name__ == "__main__": 274 main()