for assorted things
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["textual", "marvin"]
5# ///
6"""
7AI-powered TUI for killing processes.
8
9Usage:
10
11```bash
12./kill-processes
13```
14
15Details:
16- uses [`textual`](https://textual.textualize.io/) for the TUI
17- uses [`marvin`](https://github.com/prefecthq/marvin) (built on [`pydantic-ai`](https://github.com/pydantic/pydantic-ai)) to annotate processes
18"""
19
20import os
21import signal
22import subprocess
23import marvin
24import hashlib
25import json
26from pathlib import Path
27from cachetools import TTLCache
28from textual.app import App, ComposeResult
29from textual.widgets import (
30 Header,
31 Footer,
32 Static,
33 Input,
34 ListView,
35 ListItem,
36 Label,
37 Button,
38)
39from textual.containers import VerticalScroll, Vertical, Container, Horizontal
40from textual.screen import ModalScreen
41from textual import work
42
43MIN_PATTERN = int(os.environ.get("PROCESS_TUI_MIN_PATTERN", 3))
44MAX_RESULTS = int(os.environ.get("PROCESS_TUI_MAX_RESULTS", 30))
45
46
47CACHE_DIR = Path(os.path.expanduser("~/.cache/process_tui"))
48CACHE_FILE = CACHE_DIR / "annotations.json"
49# In-memory cache with TTL (time to live)
50memory_cache = TTLCache(maxsize=1000, ttl=3600) # 1 hour TTL
51
52
53def get_process_info(pid: str) -> tuple[str, str]:
54 """Get command and start time for a process."""
55 try:
56 cmd = subprocess.run(
57 ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True
58 ).stdout.strip()
59
60 start_time = subprocess.run(
61 ["ps", "-p", pid, "-o", "lstart="], capture_output=True, text=True
62 ).stdout.strip()
63
64 return cmd, start_time
65 except Exception:
66 return "", ""
67
68
69def get_process_key(pid: str, cmd: str, start_time: str) -> str:
70 """Generate a unique key for the process based on PID, command, and start time."""
71 content = f"{pid}:{cmd}:{start_time}"
72 return hashlib.md5(content.encode()).hexdigest()
73
74
75def load_disk_cache() -> dict:
76 """Load the cache from disk."""
77 try:
78 if CACHE_FILE.exists():
79 with open(CACHE_FILE, "r") as f:
80 return json.load(f)
81 except Exception:
82 pass
83 return {}
84
85
86def save_disk_cache(cache_data: dict) -> None:
87 """Save the cache to disk."""
88 try:
89 CACHE_DIR.mkdir(parents=True, exist_ok=True)
90 with open(CACHE_FILE, "w") as f:
91 json.dump(cache_data, f)
92 except Exception:
93 pass
94
95
96def annotate_pids(pids: list[str], cmds: dict[str, str]) -> dict[str, set[str]]:
97 """Generate meaningful annotations for processes using Marvin with caching."""
98 results = {}
99 uncached_pids = []
100 uncached_cmds = {}
101 disk_cache = load_disk_cache()
102
103 # First check memory and disk cache
104 for pid in pids:
105 cmd = cmds.get(pid, "")
106 start_time = get_process_info(pid)[1]
107 process_key = get_process_key(pid, cmd, start_time)
108
109 # Check memory cache first (faster)
110 if process_key in memory_cache:
111 results[pid] = set(memory_cache[process_key])
112 # Then check disk cache
113 elif process_key in disk_cache:
114 results[pid] = set(disk_cache[process_key])
115 # Also add to memory cache for faster access next time
116 memory_cache[process_key] = disk_cache[process_key]
117 else:
118 uncached_pids.append(pid)
119 uncached_cmds[pid] = cmd
120
121 # Only use Marvin if we have uncached processes
122 if uncached_pids:
123 try:
124 annotations = marvin.run(
125 instructions="Analyze these processes and provide 2-3 tags for each PID that describe what the process is doing. "
126 "Use consistent terminology across similar processes. "
127 "Return a dictionary where keys are PIDs and values are lists of tags.",
128 context={
129 "processes": "\n".join(
130 [
131 f"PID {pid}: {uncached_cmds.get(pid, '')}"
132 for pid in uncached_pids
133 ]
134 )
135 },
136 result_type=dict[str, set[str]],
137 )
138
139 # Update results with new annotations
140 results.update(annotations)
141
142 # Update both caches with new results
143 cache_updates = {}
144 for pid in uncached_pids:
145 if pid in annotations:
146 cmd = uncached_cmds.get(pid, "")
147 start_time = get_process_info(pid)[1]
148 process_key = get_process_key(pid, cmd, start_time)
149
150 # Convert set to list for JSON serialization
151 tags_list = list(annotations[pid])
152 memory_cache[process_key] = tags_list
153 cache_updates[process_key] = tags_list
154
155 # Update disk cache with new entries
156 if cache_updates:
157 disk_cache.update(cache_updates)
158 save_disk_cache(disk_cache)
159
160 except Exception:
161 # Fallback for errors
162 for pid in uncached_pids:
163 results[pid] = {"analysis failed"}
164
165 # Ensure all PIDs have annotations
166 for pid in pids:
167 if pid not in results:
168 results[pid] = {"no analysis available"}
169
170 return results
171
172
173class ProcessList(Static):
174 def compose(self) -> ComposeResult:
175 self.list_view = ListView()
176 yield self.list_view
177
178 def update_processes(self, pattern: str = ""):
179 self._pids = []
180 self._cmds = {}
181 if not pattern:
182 self.list_view.clear()
183 self.list_view.append(
184 ListItem(Label("Enter a pattern to search for processes."))
185 )
186 return
187 if len(pattern) < MIN_PATTERN:
188 self.list_view.clear()
189 self.list_view.append(
190 ListItem(Label(f"Pattern must be at least {MIN_PATTERN} characters."))
191 )
192 return
193 try:
194 result = subprocess.run(
195 ["pgrep", "-f", pattern], capture_output=True, text=True
196 )
197 pids = [pid for pid in result.stdout.strip().split() if pid]
198 except FileNotFoundError:
199 self.list_view.clear()
200 self.list_view.append(ListItem(Label("pgrep not found")))
201 return
202 if len(pids) > MAX_RESULTS:
203 self.list_view.clear()
204 self.list_view.append(
205 ListItem(
206 Label(
207 f"Too many results ({len(pids)}). Please refine your pattern or set PROCESS_TUI_MAX_RESULTS."
208 )
209 )
210 )
211 return
212 self.list_view.clear()
213 if not pids:
214 self.list_view.append(ListItem(Label("No processes found.")))
215 return
216 self._pids = pids
217 for pid in pids:
218 pid_id = f"pid_{pid}"
219 try:
220 cmd = subprocess.run(
221 ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True
222 ).stdout.strip()
223 self._cmds[pid] = cmd
224 try:
225 self.list_view.get_child_by_id(pid_id)
226 except Exception:
227 item = ListItem(
228 Label(
229 f"[b]PID:[/b] {pid}\n[dim]{cmd}[/dim]\n[italic]Annotating...[/italic]"
230 ),
231 id=pid_id,
232 )
233 self.list_view.append(item)
234 except Exception:
235 self._cmds[pid] = "(unable to get command)"
236 try:
237 self.list_view.get_child_by_id(pid_id)
238 except Exception:
239 item = ListItem(
240 Label(
241 f"[b]PID:[/b] {pid}\n[red](unable to get command)[/red]\n[italic]Annotating...[/italic]"
242 ),
243 id=pid_id,
244 )
245 self.list_view.append(item)
246 self.annotate_async(pids, self._cmds)
247
248 @work(thread=True)
249 def annotate_async(self, pids: list[str], cmds: dict[str, str]):
250 annotations = annotate_pids(pids, cmds)
251 for pid in pids:
252 pid_id = f"pid_{pid}"
253 label = f"[b]PID:[/b] {pid}\n[dim]{self._cmds[pid]}[/dim]"
254 if annotations[pid]:
255 label += "\n[yellow]" + ", ".join(annotations[pid]) + "[/yellow]"
256 else:
257 label += "\n[italic dim]No annotation[/italic dim]"
258
259 def update_label():
260 try:
261 child = self.list_view.get_child_by_id(pid_id)
262 if isinstance(child, ListItem):
263 label_widget = child.query_one(Label)
264 label_widget.update(label)
265 except Exception:
266 pass
267
268 self.app.call_from_thread(update_label)
269
270
271class ConfirmKillScreen(ModalScreen[bool]):
272 def __init__(self, pattern: str):
273 super().__init__()
274 self.pattern = pattern
275
276 def compose(self) -> ComposeResult:
277 with Container():
278 yield Label(f"Kill all processes matching '{self.pattern}'?", id="question")
279 with Horizontal():
280 yield Button("No", id="no", variant="primary")
281 yield Button("Yes", id="yes", variant="error")
282
283 def on_button_pressed(self, event: Button.Pressed) -> None:
284 self.dismiss(event.button.id == "yes")
285
286
287class ProcessTUI(App):
288 CSS = """
289 VerticalScroll {
290 padding: 1;
291 }
292 Input {
293 margin: 1;
294 }
295 ListView {
296 margin: 1;
297 }
298 #question {
299 content-align: center middle;
300 margin-bottom: 1;
301 }
302 """
303
304 def compose(self) -> ComposeResult:
305 yield Header()
306 with Vertical():
307 self.input = Input(
308 placeholder="Enter pattern to match processes (press Enter to kill)"
309 )
310 yield self.input
311 with VerticalScroll():
312 self.proc_list = ProcessList()
313 yield self.proc_list
314 yield Footer()
315
316 def on_mount(self):
317 self.proc_list.update_processes()
318
319 def on_input_submitted(self, event: Input.Submitted):
320 pattern = event.value.strip()
321 if not pattern or len(pattern) < MIN_PATTERN:
322 return
323
324 def handle_confirm(result: bool | None):
325 if result:
326 try:
327 proc_result = subprocess.run(
328 ["pgrep", "-f", pattern], capture_output=True, text=True
329 )
330 pids = [
331 int(pid) for pid in proc_result.stdout.strip().split() if pid
332 ]
333 for pid in pids:
334 try:
335 os.kill(pid, signal.SIGTERM)
336 except Exception:
337 pass
338 except Exception:
339 pass
340 self.input.disabled = False
341 self.proc_list.update_processes()
342
343 self.input.disabled = True
344 self.push_screen(ConfirmKillScreen(pattern), handle_confirm)
345
346 def on_input_changed(self, event: Input.Changed):
347 if not self.input.disabled:
348 self.proc_list.update_processes(event.value.strip())
349
350
351if __name__ == "__main__":
352 ProcessTUI().run()