a digital person for bluesky
1#!/usr/bin/env python3
2"""Queue management utilities for Void bot."""
3import json
4import argparse
5from pathlib import Path
6from rich.console import Console
7from rich.table import Table
8from rich.prompt import Confirm
9
10console = Console()
11
12# Queue directories
13QUEUE_DIR = Path("queue")
14QUEUE_ERROR_DIR = QUEUE_DIR / "errors"
15QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply"
16
17
18def load_notification(filepath: Path) -> dict:
19 """Load a notification from a JSON file."""
20 try:
21 with open(filepath, 'r') as f:
22 return json.load(f)
23 except Exception as e:
24 console.print(f"[red]Error loading {filepath}: {e}[/red]")
25 return None
26
27
28def list_notifications(handle_filter: str = None, show_all: bool = False):
29 """List all notifications in the queue, optionally filtered by handle."""
30 # Collect notifications from all directories if show_all is True
31 if show_all:
32 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR]
33 else:
34 dirs_to_check = [QUEUE_DIR]
35
36 all_notifications = []
37
38 for directory in dirs_to_check:
39 if not directory.exists():
40 continue
41
42 # Get source directory name for display
43 if directory == QUEUE_DIR:
44 source = "queue"
45 elif directory == QUEUE_ERROR_DIR:
46 source = "errors"
47 elif directory == QUEUE_NO_REPLY_DIR:
48 source = "no_reply"
49 else:
50 source = "unknown"
51
52 for filepath in directory.glob("*.json"):
53 # Skip subdirectories
54 if filepath.is_dir():
55 continue
56
57 notif = load_notification(filepath)
58 if notif and isinstance(notif, dict):
59 notif['_filepath'] = filepath
60 notif['_source'] = source
61
62 # Apply handle filter if specified
63 if handle_filter:
64 author_handle = notif.get('author', {}).get('handle', '')
65 if handle_filter.lower() not in author_handle.lower():
66 continue
67
68 all_notifications.append(notif)
69
70 # Sort by indexed_at
71 all_notifications.sort(key=lambda x: x.get('indexed_at', ''), reverse=True)
72
73 # Display results
74 if not all_notifications:
75 if handle_filter:
76 console.print(f"[yellow]No notifications found for handle containing '{handle_filter}'[/yellow]")
77 else:
78 console.print("[yellow]No notifications found in queue[/yellow]")
79 return
80
81 table = Table(title=f"Queue Notifications ({len(all_notifications)} total)")
82 table.add_column("File", style="cyan", width=20)
83 table.add_column("Source", style="magenta", width=10)
84 table.add_column("Handle", style="green", width=25)
85 table.add_column("Display Name", width=25)
86 table.add_column("Text", width=40)
87 table.add_column("Time", style="dim", width=20)
88
89 for notif in all_notifications:
90 author = notif.get('author', {})
91 handle = author.get('handle', 'unknown')
92 display_name = author.get('display_name', '')
93 text = notif.get('record', {}).get('text', '')[:40]
94 if len(notif.get('record', {}).get('text', '')) > 40:
95 text += "..."
96 indexed_at = notif.get('indexed_at', '')[:19] # Trim milliseconds
97 filename = notif['_filepath'].name[:20]
98 source = notif['_source']
99
100 table.add_row(filename, source, f"@{handle}", display_name, text, indexed_at)
101
102 console.print(table)
103 return all_notifications
104
105
106def delete_by_handle(handle: str, dry_run: bool = False, force: bool = False):
107 """Delete all notifications from a specific handle."""
108 # Remove @ if present
109 handle = handle.lstrip('@')
110
111 # Find all notifications from this handle
112 console.print(f"\\n[bold]Searching for notifications from @{handle}...[/bold]\\n")
113
114 to_delete = []
115 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR]
116
117 for directory in dirs_to_check:
118 if not directory.exists():
119 continue
120
121 for filepath in directory.glob("*.json"):
122 if filepath.is_dir():
123 continue
124
125 notif = load_notification(filepath)
126 if notif and isinstance(notif, dict):
127 author_handle = notif.get('author', {}).get('handle', '')
128 if author_handle.lower() == handle.lower():
129 to_delete.append({
130 'filepath': filepath,
131 'notif': notif,
132 'source': directory.name
133 })
134
135 if not to_delete:
136 console.print(f"[yellow]No notifications found from @{handle}[/yellow]")
137 return
138
139 # Display what will be deleted
140 table = Table(title=f"Notifications to Delete from @{handle}")
141 table.add_column("File", style="cyan")
142 table.add_column("Location", style="magenta")
143 table.add_column("Text", width=50)
144 table.add_column("Time", style="dim")
145
146 for item in to_delete:
147 notif = item['notif']
148 text = notif.get('record', {}).get('text', '')[:50]
149 if len(notif.get('record', {}).get('text', '')) > 50:
150 text += "..."
151 indexed_at = notif.get('indexed_at', '')[:19]
152
153 table.add_row(
154 item['filepath'].name,
155 item['source'],
156 text,
157 indexed_at
158 )
159
160 console.print(table)
161 console.print(f"\\n[bold red]Found {len(to_delete)} notifications to delete[/bold red]")
162
163 if dry_run:
164 console.print("\\n[yellow]DRY RUN - No files were deleted[/yellow]")
165 return
166
167 # Confirm deletion
168 if not force and not Confirm.ask("\\nDo you want to delete these notifications?"):
169 console.print("[yellow]Deletion cancelled[/yellow]")
170 return
171
172 # Delete the files
173 deleted_count = 0
174 for item in to_delete:
175 try:
176 item['filepath'].unlink()
177 deleted_count += 1
178 console.print(f"[green]✓[/green] Deleted {item['filepath'].name}")
179 except Exception as e:
180 console.print(f"[red]✗[/red] Failed to delete {item['filepath'].name}: {e}")
181
182 console.print(f"\\n[bold green]Successfully deleted {deleted_count} notifications[/bold green]")
183
184
185def stats():
186 """Show queue statistics."""
187 stats_data = {
188 'queue': {'count': 0, 'handles': set()},
189 'errors': {'count': 0, 'handles': set()},
190 'no_reply': {'count': 0, 'handles': set()}
191 }
192
193 # Collect stats
194 for directory, key in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]:
195 if not directory.exists():
196 continue
197
198 for filepath in directory.glob("*.json"):
199 if filepath.is_dir():
200 continue
201
202 notif = load_notification(filepath)
203 if notif and isinstance(notif, dict):
204 stats_data[key]['count'] += 1
205 handle = notif.get('author', {}).get('handle', 'unknown')
206 stats_data[key]['handles'].add(handle)
207
208 # Display stats
209 table = Table(title="Queue Statistics")
210 table.add_column("Location", style="cyan")
211 table.add_column("Count", style="yellow")
212 table.add_column("Unique Handles", style="green")
213
214 for key, label in [('queue', 'Active Queue'), ('errors', 'Errors'), ('no_reply', 'No Reply')]:
215 table.add_row(
216 label,
217 str(stats_data[key]['count']),
218 str(len(stats_data[key]['handles']))
219 )
220
221 console.print(table)
222
223 # Show top handles
224 all_handles = {}
225 for location_data in stats_data.values():
226 for handle in location_data['handles']:
227 all_handles[handle] = all_handles.get(handle, 0) + 1
228
229 if all_handles:
230 sorted_handles = sorted(all_handles.items(), key=lambda x: x[1], reverse=True)[:10]
231
232 top_table = Table(title="Top 10 Handles by Notification Count")
233 top_table.add_column("Handle", style="green")
234 top_table.add_column("Count", style="yellow")
235
236 for handle, count in sorted_handles:
237 top_table.add_row(f"@{handle}", str(count))
238
239 console.print("\\n")
240 console.print(top_table)
241
242
243def main():
244 parser = argparse.ArgumentParser(description="Manage Void bot notification queue")
245 subparsers = parser.add_subparsers(dest='command', help='Commands')
246
247 # List command
248 list_parser = subparsers.add_parser('list', help='List notifications in queue')
249 list_parser.add_argument('--handle', help='Filter by handle (partial match)')
250 list_parser.add_argument('--all', action='store_true', help='Include errors and no_reply folders')
251
252 # Delete command
253 delete_parser = subparsers.add_parser('delete', help='Delete notifications from a specific handle')
254 delete_parser.add_argument('handle', help='Handle to delete notifications from')
255 delete_parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without deleting')
256 delete_parser.add_argument('--force', action='store_true', help='Skip confirmation prompt')
257
258 # Stats command
259 stats_parser = subparsers.add_parser('stats', help='Show queue statistics')
260
261 args = parser.parse_args()
262
263 if args.command == 'list':
264 list_notifications(args.handle, args.all)
265 elif args.command == 'delete':
266 delete_by_handle(args.handle, args.dry_run, args.force)
267 elif args.command == 'stats':
268 stats()
269 else:
270 parser.print_help()
271
272
273if __name__ == "__main__":
274 main()