a digital person for bluesky
1#!/usr/bin/env python3 2"""Queue management utilities for Void bot.""" 3import json 4import argparse 5from pathlib import Path 6from rich.console import Console 7from rich.table import Table 8from rich.prompt import Confirm 9 10console = Console() 11 12# Queue directories 13QUEUE_DIR = Path("queue") 14QUEUE_ERROR_DIR = QUEUE_DIR / "errors" 15QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply" 16 17 18def load_notification(filepath: Path) -> dict: 19 """Load a notification from a JSON file.""" 20 try: 21 with open(filepath, 'r') as f: 22 return json.load(f) 23 except Exception as e: 24 console.print(f"[red]Error loading {filepath}: {e}[/red]") 25 return None 26 27 28def list_notifications(handle_filter: str = None, show_all: bool = False): 29 """List all notifications in the queue, optionally filtered by handle.""" 30 # Collect notifications from all directories if show_all is True 31 if show_all: 32 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR] 33 else: 34 dirs_to_check = [QUEUE_DIR] 35 36 all_notifications = [] 37 38 for directory in dirs_to_check: 39 if not directory.exists(): 40 continue 41 42 # Get source directory name for display 43 if directory == QUEUE_DIR: 44 source = "queue" 45 elif directory == QUEUE_ERROR_DIR: 46 source = "errors" 47 elif directory == QUEUE_NO_REPLY_DIR: 48 source = "no_reply" 49 else: 50 source = "unknown" 51 52 for filepath in directory.glob("*.json"): 53 # Skip subdirectories 54 if filepath.is_dir(): 55 continue 56 57 notif = load_notification(filepath) 58 if notif and isinstance(notif, dict): 59 notif['_filepath'] = filepath 60 notif['_source'] = source 61 62 # Apply handle filter if specified 63 if handle_filter: 64 author_handle = notif.get('author', {}).get('handle', '') 65 if handle_filter.lower() not in author_handle.lower(): 66 continue 67 68 all_notifications.append(notif) 69 70 # Sort by indexed_at 71 all_notifications.sort(key=lambda x: x.get('indexed_at', ''), reverse=True) 72 73 # Display results 74 if not all_notifications: 75 if handle_filter: 76 console.print(f"[yellow]No notifications found for handle containing '{handle_filter}'[/yellow]") 77 else: 78 console.print("[yellow]No notifications found in queue[/yellow]") 79 return 80 81 table = Table(title=f"Queue Notifications ({len(all_notifications)} total)") 82 table.add_column("File", style="cyan", width=20) 83 table.add_column("Source", style="magenta", width=10) 84 table.add_column("Handle", style="green", width=25) 85 table.add_column("Display Name", width=25) 86 table.add_column("Text", width=40) 87 table.add_column("Time", style="dim", width=20) 88 89 for notif in all_notifications: 90 author = notif.get('author', {}) 91 handle = author.get('handle', 'unknown') 92 display_name = author.get('display_name', '') 93 text = notif.get('record', {}).get('text', '')[:40] 94 if len(notif.get('record', {}).get('text', '')) > 40: 95 text += "..." 96 indexed_at = notif.get('indexed_at', '')[:19] # Trim milliseconds 97 filename = notif['_filepath'].name[:20] 98 source = notif['_source'] 99 100 table.add_row(filename, source, f"@{handle}", display_name, text, indexed_at) 101 102 console.print(table) 103 return all_notifications 104 105 106def delete_by_handle(handle: str, dry_run: bool = False, force: bool = False): 107 """Delete all notifications from a specific handle.""" 108 # Remove @ if present 109 handle = handle.lstrip('@') 110 111 # Find all notifications from this handle 112 console.print(f"\\n[bold]Searching for notifications from @{handle}...[/bold]\\n") 113 114 to_delete = [] 115 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR] 116 117 for directory in dirs_to_check: 118 if not directory.exists(): 119 continue 120 121 for filepath in directory.glob("*.json"): 122 if filepath.is_dir(): 123 continue 124 125 notif = load_notification(filepath) 126 if notif and isinstance(notif, dict): 127 author_handle = notif.get('author', {}).get('handle', '') 128 if author_handle.lower() == handle.lower(): 129 to_delete.append({ 130 'filepath': filepath, 131 'notif': notif, 132 'source': directory.name 133 }) 134 135 if not to_delete: 136 console.print(f"[yellow]No notifications found from @{handle}[/yellow]") 137 return 138 139 # Display what will be deleted 140 table = Table(title=f"Notifications to Delete from @{handle}") 141 table.add_column("File", style="cyan") 142 table.add_column("Location", style="magenta") 143 table.add_column("Text", width=50) 144 table.add_column("Time", style="dim") 145 146 for item in to_delete: 147 notif = item['notif'] 148 text = notif.get('record', {}).get('text', '')[:50] 149 if len(notif.get('record', {}).get('text', '')) > 50: 150 text += "..." 151 indexed_at = notif.get('indexed_at', '')[:19] 152 153 table.add_row( 154 item['filepath'].name, 155 item['source'], 156 text, 157 indexed_at 158 ) 159 160 console.print(table) 161 console.print(f"\\n[bold red]Found {len(to_delete)} notifications to delete[/bold red]") 162 163 if dry_run: 164 console.print("\\n[yellow]DRY RUN - No files were deleted[/yellow]") 165 return 166 167 # Confirm deletion 168 if not force and not Confirm.ask("\\nDo you want to delete these notifications?"): 169 console.print("[yellow]Deletion cancelled[/yellow]") 170 return 171 172 # Delete the files 173 deleted_count = 0 174 for item in to_delete: 175 try: 176 item['filepath'].unlink() 177 deleted_count += 1 178 console.print(f"[green]✓[/green] Deleted {item['filepath'].name}") 179 except Exception as e: 180 console.print(f"[red]✗[/red] Failed to delete {item['filepath'].name}: {e}") 181 182 console.print(f"\\n[bold green]Successfully deleted {deleted_count} notifications[/bold green]") 183 184 185def count_by_handle(): 186 """Show detailed count of notifications by handle.""" 187 handle_counts = {} 188 189 # Collect counts from all directories 190 for directory, location in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]: 191 if not directory.exists(): 192 continue 193 194 for filepath in directory.glob("*.json"): 195 if filepath.is_dir(): 196 continue 197 198 notif = load_notification(filepath) 199 if notif and isinstance(notif, dict): 200 handle = notif.get('author', {}).get('handle', 'unknown') 201 202 if handle not in handle_counts: 203 handle_counts[handle] = {'queue': 0, 'errors': 0, 'no_reply': 0, 'total': 0} 204 205 handle_counts[handle][location] += 1 206 handle_counts[handle]['total'] += 1 207 208 if not handle_counts: 209 console.print("[yellow]No notifications found in any queue[/yellow]") 210 return 211 212 # Sort by total count 213 sorted_handles = sorted(handle_counts.items(), key=lambda x: x[1]['total'], reverse=True) 214 215 # Display results 216 table = Table(title=f"Notification Count by Handle ({len(handle_counts)} unique handles)") 217 table.add_column("Handle", style="green", width=30) 218 table.add_column("Queue", style="cyan", justify="right") 219 table.add_column("Errors", style="red", justify="right") 220 table.add_column("No Reply", style="yellow", justify="right") 221 table.add_column("Total", style="bold magenta", justify="right") 222 223 for handle, counts in sorted_handles: 224 table.add_row( 225 f"@{handle}", 226 str(counts['queue']) if counts['queue'] > 0 else "-", 227 str(counts['errors']) if counts['errors'] > 0 else "-", 228 str(counts['no_reply']) if counts['no_reply'] > 0 else "-", 229 str(counts['total']) 230 ) 231 232 console.print(table) 233 234 # Summary statistics 235 total_notifications = sum(h['total'] for h in handle_counts.values()) 236 avg_per_handle = total_notifications / len(handle_counts) 237 238 console.print(f"\n[bold]Summary:[/bold]") 239 console.print(f" Total notifications: {total_notifications}") 240 console.print(f" Unique handles: {len(handle_counts)}") 241 console.print(f" Average per handle: {avg_per_handle:.1f}") 242 243 # Top user info 244 if sorted_handles: 245 top_handle, top_counts = sorted_handles[0] 246 percentage = (top_counts['total'] / total_notifications) * 100 247 console.print(f" Most active: @{top_handle} ({top_counts['total']} notifications, {percentage:.1f}% of total)") 248 249 250def stats(): 251 """Show queue statistics.""" 252 stats_data = { 253 'queue': {'count': 0, 'handles': set()}, 254 'errors': {'count': 0, 'handles': set()}, 255 'no_reply': {'count': 0, 'handles': set()} 256 } 257 258 # Collect stats 259 for directory, key in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]: 260 if not directory.exists(): 261 continue 262 263 for filepath in directory.glob("*.json"): 264 if filepath.is_dir(): 265 continue 266 267 notif = load_notification(filepath) 268 if notif and isinstance(notif, dict): 269 stats_data[key]['count'] += 1 270 handle = notif.get('author', {}).get('handle', 'unknown') 271 stats_data[key]['handles'].add(handle) 272 273 # Display stats 274 table = Table(title="Queue Statistics") 275 table.add_column("Location", style="cyan") 276 table.add_column("Count", style="yellow") 277 table.add_column("Unique Handles", style="green") 278 279 for key, label in [('queue', 'Active Queue'), ('errors', 'Errors'), ('no_reply', 'No Reply')]: 280 table.add_row( 281 label, 282 str(stats_data[key]['count']), 283 str(len(stats_data[key]['handles'])) 284 ) 285 286 console.print(table) 287 288 # Show top handles 289 all_handles = {} 290 for location_data in stats_data.values(): 291 for handle in location_data['handles']: 292 all_handles[handle] = all_handles.get(handle, 0) + 1 293 294 if all_handles: 295 sorted_handles = sorted(all_handles.items(), key=lambda x: x[1], reverse=True)[:10] 296 297 top_table = Table(title="Top 10 Handles by Notification Count") 298 top_table.add_column("Handle", style="green") 299 top_table.add_column("Count", style="yellow") 300 301 for handle, count in sorted_handles: 302 top_table.add_row(f"@{handle}", str(count)) 303 304 console.print("\\n") 305 console.print(top_table) 306 307 308def main(): 309 parser = argparse.ArgumentParser(description="Manage Void bot notification queue") 310 subparsers = parser.add_subparsers(dest='command', help='Commands') 311 312 # List command 313 list_parser = subparsers.add_parser('list', help='List notifications in queue') 314 list_parser.add_argument('--handle', help='Filter by handle (partial match)') 315 list_parser.add_argument('--all', action='store_true', help='Include errors and no_reply folders') 316 317 # Delete command 318 delete_parser = subparsers.add_parser('delete', help='Delete notifications from a specific handle') 319 delete_parser.add_argument('handle', help='Handle to delete notifications from') 320 delete_parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without deleting') 321 delete_parser.add_argument('--force', action='store_true', help='Skip confirmation prompt') 322 323 # Stats command 324 stats_parser = subparsers.add_parser('stats', help='Show queue statistics') 325 326 # Count command 327 count_parser = subparsers.add_parser('count', help='Show detailed count by handle') 328 329 args = parser.parse_args() 330 331 if args.command == 'list': 332 list_notifications(args.handle, args.all) 333 elif args.command == 'delete': 334 delete_by_handle(args.handle, args.dry_run, args.force) 335 elif args.command == 'stats': 336 stats() 337 elif args.command == 'count': 338 count_by_handle() 339 else: 340 parser.print_help() 341 342 343if __name__ == "__main__": 344 main()