for assorted things
at main 12 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["textual", "marvin"] 5# /// 6""" 7AI-powered TUI for killing processes. 8 9Usage: 10 11```bash 12./kill-processes 13``` 14 15Details: 16- uses [`textual`](https://textual.textualize.io/) for the TUI 17- uses [`marvin`](https://github.com/prefecthq/marvin) (built on [`pydantic-ai`](https://github.com/pydantic/pydantic-ai)) to annotate processes 18""" 19 20import os 21import signal 22import subprocess 23import marvin 24import hashlib 25import json 26import asyncio 27from pathlib import Path 28from cachetools import TTLCache 29from textual.app import App, ComposeResult 30from textual.widgets import ( 31 Header, 32 Footer, 33 Static, 34 Input, 35 ListView, 36 ListItem, 37 Label, 38 Button, 39) 40from textual.containers import VerticalScroll, Vertical, Container, Horizontal 41from textual.screen import ModalScreen 42from textual import work 43 44MIN_PATTERN = int(os.environ.get("PROCESS_TUI_MIN_PATTERN", 3)) 45MAX_RESULTS = int(os.environ.get("PROCESS_TUI_MAX_RESULTS", 30)) 46 47 48CACHE_DIR = Path(os.path.expanduser("~/.cache/process_tui")) 49CACHE_FILE = CACHE_DIR / "annotations.json" 50# In-memory cache with TTL (time to live) 51memory_cache = TTLCache(maxsize=1000, ttl=3600) # 1 hour TTL 52 53 54def get_process_info(pid: str) -> tuple[str, str]: 55 """Get command and start time for a process.""" 56 try: 57 cmd = subprocess.run( 58 ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True 59 ).stdout.strip() 60 61 start_time = subprocess.run( 62 ["ps", "-p", pid, "-o", "lstart="], capture_output=True, text=True 63 ).stdout.strip() 64 65 return cmd, start_time 66 except Exception: 67 return "", "" 68 69 70def get_process_key(pid: str, cmd: str, start_time: str) -> str: 71 """Generate a unique key for the process based on PID, command, and start time.""" 72 content = f"{pid}:{cmd}:{start_time}" 73 return hashlib.md5(content.encode()).hexdigest() 74 75 76def load_disk_cache() -> dict: 77 """Load the cache from disk.""" 78 try: 79 if CACHE_FILE.exists(): 80 with open(CACHE_FILE, "r") as f: 81 return json.load(f) 82 except Exception: 83 pass 84 return {} 85 86 87def save_disk_cache(cache_data: dict) -> None: 88 """Save the cache to disk.""" 89 try: 90 CACHE_DIR.mkdir(parents=True, exist_ok=True) 91 with open(CACHE_FILE, "w") as f: 92 json.dump(cache_data, f) 93 except Exception: 94 pass 95 96 97def annotate_pids(pids: list[str], cmds: dict[str, str]) -> dict[str, set[str]]: 98 """Generate meaningful annotations for processes using Marvin with caching.""" 99 results = {} 100 uncached_pids = [] 101 uncached_cmds = {} 102 disk_cache = load_disk_cache() 103 104 # First check memory and disk cache 105 for pid in pids: 106 cmd = cmds.get(pid, "") 107 start_time = get_process_info(pid)[1] 108 process_key = get_process_key(pid, cmd, start_time) 109 110 # Check memory cache first (faster) 111 if process_key in memory_cache: 112 results[pid] = set(memory_cache[process_key]) 113 # Then check disk cache 114 elif process_key in disk_cache: 115 results[pid] = set(disk_cache[process_key]) 116 # Also add to memory cache for faster access next time 117 memory_cache[process_key] = disk_cache[process_key] 118 else: 119 uncached_pids.append(pid) 120 uncached_cmds[pid] = cmd 121 122 # Only use Marvin if we have uncached processes 123 if uncached_pids: 124 try: 125 annotations = marvin.run( 126 instructions="Analyze these processes and provide 2-3 tags for each PID that describe what the process is doing. " 127 "Use consistent terminology across similar processes. " 128 "Return a dictionary where keys are PIDs and values are lists of tags.", 129 context={ 130 "processes": "\n".join( 131 [ 132 f"PID {pid}: {uncached_cmds.get(pid, '')}" 133 for pid in uncached_pids 134 ] 135 ) 136 }, 137 result_type=dict[str, set[str]], 138 ) 139 140 # Update results with new annotations 141 results.update(annotations) 142 143 # Update both caches with new results 144 cache_updates = {} 145 for pid in uncached_pids: 146 if pid in annotations: 147 cmd = uncached_cmds.get(pid, "") 148 start_time = get_process_info(pid)[1] 149 process_key = get_process_key(pid, cmd, start_time) 150 151 # Convert set to list for JSON serialization 152 tags_list = list(annotations[pid]) 153 memory_cache[process_key] = tags_list 154 cache_updates[process_key] = tags_list 155 156 # Update disk cache with new entries 157 if cache_updates: 158 disk_cache.update(cache_updates) 159 save_disk_cache(disk_cache) 160 161 except Exception: 162 # Fallback for errors 163 for pid in uncached_pids: 164 results[pid] = {"analysis failed"} 165 166 # Ensure all PIDs have annotations 167 for pid in pids: 168 if pid not in results: 169 results[pid] = {"no analysis available"} 170 171 return results 172 173 174class ProcessList(Static): 175 def compose(self) -> ComposeResult: 176 self.list_view = ListView() 177 yield self.list_view 178 179 def update_processes(self, pattern: str = ""): 180 self._pids = [] 181 self._cmds = {} 182 if not pattern: 183 self.list_view.clear() 184 self.list_view.append( 185 ListItem(Label("Enter a pattern to search for processes.")) 186 ) 187 return 188 if len(pattern) < MIN_PATTERN: 189 self.list_view.clear() 190 self.list_view.append( 191 ListItem(Label(f"Pattern must be at least {MIN_PATTERN} characters.")) 192 ) 193 return 194 try: 195 result = subprocess.run( 196 ["pgrep", "-f", pattern], capture_output=True, text=True 197 ) 198 pids = [pid for pid in result.stdout.strip().split() if pid] 199 except FileNotFoundError: 200 self.list_view.clear() 201 self.list_view.append(ListItem(Label("pgrep not found"))) 202 return 203 if len(pids) > MAX_RESULTS: 204 self.list_view.clear() 205 self.list_view.append( 206 ListItem( 207 Label( 208 f"Too many results ({len(pids)}). Please refine your pattern or set PROCESS_TUI_MAX_RESULTS." 209 ) 210 ) 211 ) 212 return 213 self.list_view.clear() 214 if not pids: 215 self.list_view.append(ListItem(Label("No processes found."))) 216 return 217 self._pids = pids 218 for pid in pids: 219 pid_id = f"pid_{pid}" 220 try: 221 cmd = subprocess.run( 222 ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True 223 ).stdout.strip() 224 self._cmds[pid] = cmd 225 try: 226 self.list_view.get_child_by_id(pid_id) 227 except Exception: 228 item = ListItem( 229 Label( 230 f"[b]PID:[/b] {pid}\n[dim]{cmd}[/dim]\n[italic]Annotating...[/italic]" 231 ), 232 id=pid_id, 233 ) 234 self.list_view.append(item) 235 except Exception: 236 self._cmds[pid] = "(unable to get command)" 237 try: 238 self.list_view.get_child_by_id(pid_id) 239 except Exception: 240 item = ListItem( 241 Label( 242 f"[b]PID:[/b] {pid}\n[red](unable to get command)[/red]\n[italic]Annotating...[/italic]" 243 ), 244 id=pid_id, 245 ) 246 self.list_view.append(item) 247 self.annotate_async(pids, self._cmds) 248 249 @work(thread=True) 250 def annotate_async(self, pids: list[str], cmds: dict[str, str]): 251 annotations = annotate_pids(pids, cmds) 252 for pid in pids: 253 pid_id = f"pid_{pid}" 254 label = f"[b]PID:[/b] {pid}\n[dim]{self._cmds[pid]}[/dim]" 255 if annotations[pid]: 256 label += "\n[yellow]" + ", ".join(annotations[pid]) + "[/yellow]" 257 else: 258 label += "\n[italic dim]No annotation[/italic dim]" 259 260 def update_label(): 261 try: 262 child = self.list_view.get_child_by_id(pid_id) 263 if isinstance(child, ListItem): 264 label_widget = child.query_one(Label) 265 label_widget.update(label) 266 except Exception: 267 pass 268 269 self.app.call_from_thread(update_label) 270 271 272class ConfirmKillScreen(ModalScreen[bool]): 273 def __init__(self, pattern: str): 274 super().__init__() 275 self.pattern = pattern 276 277 def compose(self) -> ComposeResult: 278 with Container(): 279 yield Label(f"Kill all processes matching '{self.pattern}'?", id="question") 280 with Horizontal(): 281 yield Button("No", id="no", variant="primary") 282 yield Button("Yes", id="yes", variant="error") 283 284 def on_button_pressed(self, event: Button.Pressed) -> None: 285 self.dismiss(event.button.id == "yes") 286 287 288class ProcessTUI(App): 289 CSS = """ 290 VerticalScroll { 291 padding: 1; 292 } 293 Input { 294 margin: 1; 295 } 296 ListView { 297 margin: 1; 298 } 299 #question { 300 content-align: center middle; 301 margin-bottom: 1; 302 } 303 """ 304 305 def compose(self) -> ComposeResult: 306 yield Header() 307 with Vertical(): 308 self.input = Input( 309 placeholder="Enter pattern to match processes (press Enter to kill)" 310 ) 311 yield self.input 312 with VerticalScroll(): 313 self.proc_list = ProcessList() 314 yield self.proc_list 315 yield Footer() 316 317 def on_mount(self): 318 self.proc_list.update_processes() 319 320 def on_input_submitted(self, event: Input.Submitted): 321 pattern = event.value.strip() 322 if not pattern or len(pattern) < MIN_PATTERN: 323 return 324 325 def handle_confirm(result: bool | None): 326 if result: 327 try: 328 proc_result = subprocess.run( 329 ["pgrep", "-f", pattern], capture_output=True, text=True 330 ) 331 pids = [ 332 int(pid) for pid in proc_result.stdout.strip().split() if pid 333 ] 334 for pid in pids: 335 try: 336 os.kill(pid, signal.SIGTERM) 337 except Exception: 338 pass 339 except Exception: 340 pass 341 self.input.disabled = False 342 self.proc_list.update_processes() 343 344 self.input.disabled = True 345 self.push_screen(ConfirmKillScreen(pattern), handle_confirm) 346 347 def on_input_changed(self, event: Input.Changed): 348 if not self.input.disabled: 349 self.proc_list.update_processes(event.value.strip()) 350 351 352if __name__ == "__main__": 353 try: 354 loop = asyncio.get_event_loop() 355 except RuntimeError: 356 loop = asyncio.new_event_loop() 357 asyncio.set_event_loop(loop) 358 ProcessTUI().run()