for assorted things
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["textual", "marvin"] 5# /// 6""" 7AI-powered TUI for killing processes. 8 9Usage: 10 11```bash 12./kill-processes 13``` 14 15Details: 16- uses [`textual`](https://textual.textualize.io/) for the TUI 17- uses [`marvin`](https://github.com/prefecthq/marvin) (built on [`pydantic-ai`](https://github.com/pydantic/pydantic-ai)) to annotate processes 18""" 19 20import os 21import signal 22import subprocess 23import marvin 24import hashlib 25import json 26from pathlib import Path 27from cachetools import TTLCache 28from textual.app import App, ComposeResult 29from textual.widgets import ( 30 Header, 31 Footer, 32 Static, 33 Input, 34 ListView, 35 ListItem, 36 Label, 37 Button, 38) 39from textual.containers import VerticalScroll, Vertical, Container, Horizontal 40from textual.screen import ModalScreen 41from textual import work 42 43MIN_PATTERN = int(os.environ.get("PROCESS_TUI_MIN_PATTERN", 3)) 44MAX_RESULTS = int(os.environ.get("PROCESS_TUI_MAX_RESULTS", 30)) 45 46 47CACHE_DIR = Path(os.path.expanduser("~/.cache/process_tui")) 48CACHE_FILE = CACHE_DIR / "annotations.json" 49# In-memory cache with TTL (time to live) 50memory_cache = TTLCache(maxsize=1000, ttl=3600) # 1 hour TTL 51 52 53def get_process_info(pid: str) -> tuple[str, str]: 54 """Get command and start time for a process.""" 55 try: 56 cmd = subprocess.run( 57 ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True 58 ).stdout.strip() 59 60 start_time = subprocess.run( 61 ["ps", "-p", pid, "-o", "lstart="], capture_output=True, text=True 62 ).stdout.strip() 63 64 return cmd, start_time 65 except Exception: 66 return "", "" 67 68 69def get_process_key(pid: str, cmd: str, start_time: str) -> str: 70 """Generate a unique key for the process based on PID, command, and start time.""" 71 content = f"{pid}:{cmd}:{start_time}" 72 return hashlib.md5(content.encode()).hexdigest() 73 74 75def load_disk_cache() -> dict: 76 """Load the cache from disk.""" 77 try: 78 if CACHE_FILE.exists(): 79 with open(CACHE_FILE, "r") as f: 80 return json.load(f) 81 except Exception: 82 pass 83 return {} 84 85 86def save_disk_cache(cache_data: dict) -> None: 87 """Save the cache to disk.""" 88 try: 89 CACHE_DIR.mkdir(parents=True, exist_ok=True) 90 with open(CACHE_FILE, "w") as f: 91 json.dump(cache_data, f) 92 except Exception: 93 pass 94 95 96def annotate_pids(pids: list[str], cmds: dict[str, str]) -> dict[str, set[str]]: 97 """Generate meaningful annotations for processes using Marvin with caching.""" 98 results = {} 99 uncached_pids = [] 100 uncached_cmds = {} 101 disk_cache = load_disk_cache() 102 103 # First check memory and disk cache 104 for pid in pids: 105 cmd = cmds.get(pid, "") 106 start_time = get_process_info(pid)[1] 107 process_key = get_process_key(pid, cmd, start_time) 108 109 # Check memory cache first (faster) 110 if process_key in memory_cache: 111 results[pid] = set(memory_cache[process_key]) 112 # Then check disk cache 113 elif process_key in disk_cache: 114 results[pid] = set(disk_cache[process_key]) 115 # Also add to memory cache for faster access next time 116 memory_cache[process_key] = disk_cache[process_key] 117 else: 118 uncached_pids.append(pid) 119 uncached_cmds[pid] = cmd 120 121 # Only use Marvin if we have uncached processes 122 if uncached_pids: 123 try: 124 annotations = marvin.run( 125 instructions="Analyze these processes and provide 2-3 tags for each PID that describe what the process is doing. " 126 "Use consistent terminology across similar processes. " 127 "Return a dictionary where keys are PIDs and values are lists of tags.", 128 context={ 129 "processes": "\n".join( 130 [ 131 f"PID {pid}: {uncached_cmds.get(pid, '')}" 132 for pid in uncached_pids 133 ] 134 ) 135 }, 136 result_type=dict[str, set[str]], 137 ) 138 139 # Update results with new annotations 140 results.update(annotations) 141 142 # Update both caches with new results 143 cache_updates = {} 144 for pid in uncached_pids: 145 if pid in annotations: 146 cmd = uncached_cmds.get(pid, "") 147 start_time = get_process_info(pid)[1] 148 process_key = get_process_key(pid, cmd, start_time) 149 150 # Convert set to list for JSON serialization 151 tags_list = list(annotations[pid]) 152 memory_cache[process_key] = tags_list 153 cache_updates[process_key] = tags_list 154 155 # Update disk cache with new entries 156 if cache_updates: 157 disk_cache.update(cache_updates) 158 save_disk_cache(disk_cache) 159 160 except Exception: 161 # Fallback for errors 162 for pid in uncached_pids: 163 results[pid] = {"analysis failed"} 164 165 # Ensure all PIDs have annotations 166 for pid in pids: 167 if pid not in results: 168 results[pid] = {"no analysis available"} 169 170 return results 171 172 173class ProcessList(Static): 174 def compose(self) -> ComposeResult: 175 self.list_view = ListView() 176 yield self.list_view 177 178 def update_processes(self, pattern: str = ""): 179 self._pids = [] 180 self._cmds = {} 181 if not pattern: 182 self.list_view.clear() 183 self.list_view.append( 184 ListItem(Label("Enter a pattern to search for processes.")) 185 ) 186 return 187 if len(pattern) < MIN_PATTERN: 188 self.list_view.clear() 189 self.list_view.append( 190 ListItem(Label(f"Pattern must be at least {MIN_PATTERN} characters.")) 191 ) 192 return 193 try: 194 result = subprocess.run( 195 ["pgrep", "-f", pattern], capture_output=True, text=True 196 ) 197 pids = [pid for pid in result.stdout.strip().split() if pid] 198 except FileNotFoundError: 199 self.list_view.clear() 200 self.list_view.append(ListItem(Label("pgrep not found"))) 201 return 202 if len(pids) > MAX_RESULTS: 203 self.list_view.clear() 204 self.list_view.append( 205 ListItem( 206 Label( 207 f"Too many results ({len(pids)}). Please refine your pattern or set PROCESS_TUI_MAX_RESULTS." 208 ) 209 ) 210 ) 211 return 212 self.list_view.clear() 213 if not pids: 214 self.list_view.append(ListItem(Label("No processes found."))) 215 return 216 self._pids = pids 217 for pid in pids: 218 pid_id = f"pid_{pid}" 219 try: 220 cmd = subprocess.run( 221 ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True 222 ).stdout.strip() 223 self._cmds[pid] = cmd 224 try: 225 self.list_view.get_child_by_id(pid_id) 226 except Exception: 227 item = ListItem( 228 Label( 229 f"[b]PID:[/b] {pid}\n[dim]{cmd}[/dim]\n[italic]Annotating...[/italic]" 230 ), 231 id=pid_id, 232 ) 233 self.list_view.append(item) 234 except Exception: 235 self._cmds[pid] = "(unable to get command)" 236 try: 237 self.list_view.get_child_by_id(pid_id) 238 except Exception: 239 item = ListItem( 240 Label( 241 f"[b]PID:[/b] {pid}\n[red](unable to get command)[/red]\n[italic]Annotating...[/italic]" 242 ), 243 id=pid_id, 244 ) 245 self.list_view.append(item) 246 self.annotate_async(pids, self._cmds) 247 248 @work(thread=True) 249 def annotate_async(self, pids: list[str], cmds: dict[str, str]): 250 annotations = annotate_pids(pids, cmds) 251 for pid in pids: 252 pid_id = f"pid_{pid}" 253 label = f"[b]PID:[/b] {pid}\n[dim]{self._cmds[pid]}[/dim]" 254 if annotations[pid]: 255 label += "\n[yellow]" + ", ".join(annotations[pid]) + "[/yellow]" 256 else: 257 label += "\n[italic dim]No annotation[/italic dim]" 258 259 def update_label(): 260 try: 261 child = self.list_view.get_child_by_id(pid_id) 262 if isinstance(child, ListItem): 263 label_widget = child.query_one(Label) 264 label_widget.update(label) 265 except Exception: 266 pass 267 268 self.app.call_from_thread(update_label) 269 270 271class ConfirmKillScreen(ModalScreen[bool]): 272 def __init__(self, pattern: str): 273 super().__init__() 274 self.pattern = pattern 275 276 def compose(self) -> ComposeResult: 277 with Container(): 278 yield Label(f"Kill all processes matching '{self.pattern}'?", id="question") 279 with Horizontal(): 280 yield Button("No", id="no", variant="primary") 281 yield Button("Yes", id="yes", variant="error") 282 283 def on_button_pressed(self, event: Button.Pressed) -> None: 284 self.dismiss(event.button.id == "yes") 285 286 287class ProcessTUI(App): 288 CSS = """ 289 VerticalScroll { 290 padding: 1; 291 } 292 Input { 293 margin: 1; 294 } 295 ListView { 296 margin: 1; 297 } 298 #question { 299 content-align: center middle; 300 margin-bottom: 1; 301 } 302 """ 303 304 def compose(self) -> ComposeResult: 305 yield Header() 306 with Vertical(): 307 self.input = Input( 308 placeholder="Enter pattern to match processes (press Enter to kill)" 309 ) 310 yield self.input 311 with VerticalScroll(): 312 self.proc_list = ProcessList() 313 yield self.proc_list 314 yield Footer() 315 316 def on_mount(self): 317 self.proc_list.update_processes() 318 319 def on_input_submitted(self, event: Input.Submitted): 320 pattern = event.value.strip() 321 if not pattern or len(pattern) < MIN_PATTERN: 322 return 323 324 def handle_confirm(result: bool | None): 325 if result: 326 try: 327 proc_result = subprocess.run( 328 ["pgrep", "-f", pattern], capture_output=True, text=True 329 ) 330 pids = [ 331 int(pid) for pid in proc_result.stdout.strip().split() if pid 332 ] 333 for pid in pids: 334 try: 335 os.kill(pid, signal.SIGTERM) 336 except Exception: 337 pass 338 except Exception: 339 pass 340 self.input.disabled = False 341 self.proc_list.update_processes() 342 343 self.input.disabled = True 344 self.push_screen(ConfirmKillScreen(pattern), handle_confirm) 345 346 def on_input_changed(self, event: Input.Changed): 347 if not self.input.disabled: 348 self.proc_list.update_processes(event.value.strip()) 349 350 351if __name__ == "__main__": 352 ProcessTUI().run()