#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["textual", "marvin"] # /// """ AI-powered TUI for killing processes. Usage: ```bash ./kill-processes ``` Details: - uses [`textual`](https://textual.textualize.io/) for the TUI - uses [`marvin`](https://github.com/prefecthq/marvin) (built on [`pydantic-ai`](https://github.com/pydantic/pydantic-ai)) to annotate processes """ import os import signal import subprocess import marvin import hashlib import json import asyncio from pathlib import Path from cachetools import TTLCache from textual.app import App, ComposeResult from textual.widgets import ( Header, Footer, Static, Input, ListView, ListItem, Label, Button, ) from textual.containers import VerticalScroll, Vertical, Container, Horizontal from textual.screen import ModalScreen from textual import work MIN_PATTERN = int(os.environ.get("PROCESS_TUI_MIN_PATTERN", 3)) MAX_RESULTS = int(os.environ.get("PROCESS_TUI_MAX_RESULTS", 30)) CACHE_DIR = Path(os.path.expanduser("~/.cache/process_tui")) CACHE_FILE = CACHE_DIR / "annotations.json" # In-memory cache with TTL (time to live) memory_cache = TTLCache(maxsize=1000, ttl=3600) # 1 hour TTL def get_process_info(pid: str) -> tuple[str, str]: """Get command and start time for a process.""" try: cmd = subprocess.run( ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True ).stdout.strip() start_time = subprocess.run( ["ps", "-p", pid, "-o", "lstart="], capture_output=True, text=True ).stdout.strip() return cmd, start_time except Exception: return "", "" def get_process_key(pid: str, cmd: str, start_time: str) -> str: """Generate a unique key for the process based on PID, command, and start time.""" content = f"{pid}:{cmd}:{start_time}" return hashlib.md5(content.encode()).hexdigest() def load_disk_cache() -> dict: """Load the cache from disk.""" try: if CACHE_FILE.exists(): with open(CACHE_FILE, "r") as f: return json.load(f) except Exception: pass return {} def save_disk_cache(cache_data: dict) -> None: """Save the cache to disk.""" try: CACHE_DIR.mkdir(parents=True, exist_ok=True) with open(CACHE_FILE, "w") as f: json.dump(cache_data, f) except Exception: pass def annotate_pids(pids: list[str], cmds: dict[str, str]) -> dict[str, set[str]]: """Generate meaningful annotations for processes using Marvin with caching.""" results = {} uncached_pids = [] uncached_cmds = {} disk_cache = load_disk_cache() # First check memory and disk cache for pid in pids: cmd = cmds.get(pid, "") start_time = get_process_info(pid)[1] process_key = get_process_key(pid, cmd, start_time) # Check memory cache first (faster) if process_key in memory_cache: results[pid] = set(memory_cache[process_key]) # Then check disk cache elif process_key in disk_cache: results[pid] = set(disk_cache[process_key]) # Also add to memory cache for faster access next time memory_cache[process_key] = disk_cache[process_key] else: uncached_pids.append(pid) uncached_cmds[pid] = cmd # Only use Marvin if we have uncached processes if uncached_pids: try: annotations = marvin.run( instructions="Analyze these processes and provide 2-3 tags for each PID that describe what the process is doing. " "Use consistent terminology across similar processes. " "Return a dictionary where keys are PIDs and values are lists of tags.", context={ "processes": "\n".join( [ f"PID {pid}: {uncached_cmds.get(pid, '')}" for pid in uncached_pids ] ) }, result_type=dict[str, set[str]], ) # Update results with new annotations results.update(annotations) # Update both caches with new results cache_updates = {} for pid in uncached_pids: if pid in annotations: cmd = uncached_cmds.get(pid, "") start_time = get_process_info(pid)[1] process_key = get_process_key(pid, cmd, start_time) # Convert set to list for JSON serialization tags_list = list(annotations[pid]) memory_cache[process_key] = tags_list cache_updates[process_key] = tags_list # Update disk cache with new entries if cache_updates: disk_cache.update(cache_updates) save_disk_cache(disk_cache) except Exception: # Fallback for errors for pid in uncached_pids: results[pid] = {"analysis failed"} # Ensure all PIDs have annotations for pid in pids: if pid not in results: results[pid] = {"no analysis available"} return results class ProcessList(Static): def compose(self) -> ComposeResult: self.list_view = ListView() yield self.list_view def update_processes(self, pattern: str = ""): self._pids = [] self._cmds = {} if not pattern: self.list_view.clear() self.list_view.append( ListItem(Label("Enter a pattern to search for processes.")) ) return if len(pattern) < MIN_PATTERN: self.list_view.clear() self.list_view.append( ListItem(Label(f"Pattern must be at least {MIN_PATTERN} characters.")) ) return try: result = subprocess.run( ["pgrep", "-f", pattern], capture_output=True, text=True ) pids = [pid for pid in result.stdout.strip().split() if pid] except FileNotFoundError: self.list_view.clear() self.list_view.append(ListItem(Label("pgrep not found"))) return if len(pids) > MAX_RESULTS: self.list_view.clear() self.list_view.append( ListItem( Label( f"Too many results ({len(pids)}). Please refine your pattern or set PROCESS_TUI_MAX_RESULTS." ) ) ) return self.list_view.clear() if not pids: self.list_view.append(ListItem(Label("No processes found."))) return self._pids = pids for pid in pids: pid_id = f"pid_{pid}" try: cmd = subprocess.run( ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True ).stdout.strip() self._cmds[pid] = cmd try: self.list_view.get_child_by_id(pid_id) except Exception: item = ListItem( Label( f"[b]PID:[/b] {pid}\n[dim]{cmd}[/dim]\n[italic]Annotating...[/italic]" ), id=pid_id, ) self.list_view.append(item) except Exception: self._cmds[pid] = "(unable to get command)" try: self.list_view.get_child_by_id(pid_id) except Exception: item = ListItem( Label( f"[b]PID:[/b] {pid}\n[red](unable to get command)[/red]\n[italic]Annotating...[/italic]" ), id=pid_id, ) self.list_view.append(item) self.annotate_async(pids, self._cmds) @work(thread=True) def annotate_async(self, pids: list[str], cmds: dict[str, str]): annotations = annotate_pids(pids, cmds) for pid in pids: pid_id = f"pid_{pid}" label = f"[b]PID:[/b] {pid}\n[dim]{self._cmds[pid]}[/dim]" if annotations[pid]: label += "\n[yellow]" + ", ".join(annotations[pid]) + "[/yellow]" else: label += "\n[italic dim]No annotation[/italic dim]" def update_label(): try: child = self.list_view.get_child_by_id(pid_id) if isinstance(child, ListItem): label_widget = child.query_one(Label) label_widget.update(label) except Exception: pass self.app.call_from_thread(update_label) class ConfirmKillScreen(ModalScreen[bool]): def __init__(self, pattern: str): super().__init__() self.pattern = pattern def compose(self) -> ComposeResult: with Container(): yield Label(f"Kill all processes matching '{self.pattern}'?", id="question") with Horizontal(): yield Button("No", id="no", variant="primary") yield Button("Yes", id="yes", variant="error") def on_button_pressed(self, event: Button.Pressed) -> None: self.dismiss(event.button.id == "yes") class ProcessTUI(App): CSS = """ VerticalScroll { padding: 1; } Input { margin: 1; } ListView { margin: 1; } #question { content-align: center middle; margin-bottom: 1; } """ def compose(self) -> ComposeResult: yield Header() with Vertical(): self.input = Input( placeholder="Enter pattern to match processes (press Enter to kill)" ) yield self.input with VerticalScroll(): self.proc_list = ProcessList() yield self.proc_list yield Footer() def on_mount(self): self.proc_list.update_processes() def on_input_submitted(self, event: Input.Submitted): pattern = event.value.strip() if not pattern or len(pattern) < MIN_PATTERN: return def handle_confirm(result: bool | None): if result: try: proc_result = subprocess.run( ["pgrep", "-f", pattern], capture_output=True, text=True ) pids = [ int(pid) for pid in proc_result.stdout.strip().split() if pid ] for pid in pids: try: os.kill(pid, signal.SIGTERM) except Exception: pass except Exception: pass self.input.disabled = False self.proc_list.update_processes() self.input.disabled = True self.push_screen(ConfirmKillScreen(pattern), handle_confirm) def on_input_changed(self, event: Input.Changed): if not self.input.disabled: self.proc_list.update_processes(event.value.strip()) if __name__ == "__main__": try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) ProcessTUI().run()