for assorted things
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["textual", "marvin"]
5# ///
6"""
7AI-powered TUI for killing processes.
8
9Usage:
10
11```bash
12./kill-processes
13```
14
15Details:
16- uses [`textual`](https://textual.textualize.io/) for the TUI
17- uses [`marvin`](https://github.com/prefecthq/marvin) (built on [`pydantic-ai`](https://github.com/pydantic/pydantic-ai)) to annotate processes
18"""
19
20import os
21import signal
22import subprocess
23import marvin
24import hashlib
25import json
26import asyncio
27from pathlib import Path
28from cachetools import TTLCache
29from textual.app import App, ComposeResult
30from textual.widgets import (
31 Header,
32 Footer,
33 Static,
34 Input,
35 ListView,
36 ListItem,
37 Label,
38 Button,
39)
40from textual.containers import VerticalScroll, Vertical, Container, Horizontal
41from textual.screen import ModalScreen
42from textual import work
43
44MIN_PATTERN = int(os.environ.get("PROCESS_TUI_MIN_PATTERN", 3))
45MAX_RESULTS = int(os.environ.get("PROCESS_TUI_MAX_RESULTS", 30))
46
47
48CACHE_DIR = Path(os.path.expanduser("~/.cache/process_tui"))
49CACHE_FILE = CACHE_DIR / "annotations.json"
50# In-memory cache with TTL (time to live)
51memory_cache = TTLCache(maxsize=1000, ttl=3600) # 1 hour TTL
52
53
54def get_process_info(pid: str) -> tuple[str, str]:
55 """Get command and start time for a process."""
56 try:
57 cmd = subprocess.run(
58 ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True
59 ).stdout.strip()
60
61 start_time = subprocess.run(
62 ["ps", "-p", pid, "-o", "lstart="], capture_output=True, text=True
63 ).stdout.strip()
64
65 return cmd, start_time
66 except Exception:
67 return "", ""
68
69
70def get_process_key(pid: str, cmd: str, start_time: str) -> str:
71 """Generate a unique key for the process based on PID, command, and start time."""
72 content = f"{pid}:{cmd}:{start_time}"
73 return hashlib.md5(content.encode()).hexdigest()
74
75
76def load_disk_cache() -> dict:
77 """Load the cache from disk."""
78 try:
79 if CACHE_FILE.exists():
80 with open(CACHE_FILE, "r") as f:
81 return json.load(f)
82 except Exception:
83 pass
84 return {}
85
86
87def save_disk_cache(cache_data: dict) -> None:
88 """Save the cache to disk."""
89 try:
90 CACHE_DIR.mkdir(parents=True, exist_ok=True)
91 with open(CACHE_FILE, "w") as f:
92 json.dump(cache_data, f)
93 except Exception:
94 pass
95
96
97def annotate_pids(pids: list[str], cmds: dict[str, str]) -> dict[str, set[str]]:
98 """Generate meaningful annotations for processes using Marvin with caching."""
99 results = {}
100 uncached_pids = []
101 uncached_cmds = {}
102 disk_cache = load_disk_cache()
103
104 # First check memory and disk cache
105 for pid in pids:
106 cmd = cmds.get(pid, "")
107 start_time = get_process_info(pid)[1]
108 process_key = get_process_key(pid, cmd, start_time)
109
110 # Check memory cache first (faster)
111 if process_key in memory_cache:
112 results[pid] = set(memory_cache[process_key])
113 # Then check disk cache
114 elif process_key in disk_cache:
115 results[pid] = set(disk_cache[process_key])
116 # Also add to memory cache for faster access next time
117 memory_cache[process_key] = disk_cache[process_key]
118 else:
119 uncached_pids.append(pid)
120 uncached_cmds[pid] = cmd
121
122 # Only use Marvin if we have uncached processes
123 if uncached_pids:
124 try:
125 annotations = marvin.run(
126 instructions="Analyze these processes and provide 2-3 tags for each PID that describe what the process is doing. "
127 "Use consistent terminology across similar processes. "
128 "Return a dictionary where keys are PIDs and values are lists of tags.",
129 context={
130 "processes": "\n".join(
131 [
132 f"PID {pid}: {uncached_cmds.get(pid, '')}"
133 for pid in uncached_pids
134 ]
135 )
136 },
137 result_type=dict[str, set[str]],
138 )
139
140 # Update results with new annotations
141 results.update(annotations)
142
143 # Update both caches with new results
144 cache_updates = {}
145 for pid in uncached_pids:
146 if pid in annotations:
147 cmd = uncached_cmds.get(pid, "")
148 start_time = get_process_info(pid)[1]
149 process_key = get_process_key(pid, cmd, start_time)
150
151 # Convert set to list for JSON serialization
152 tags_list = list(annotations[pid])
153 memory_cache[process_key] = tags_list
154 cache_updates[process_key] = tags_list
155
156 # Update disk cache with new entries
157 if cache_updates:
158 disk_cache.update(cache_updates)
159 save_disk_cache(disk_cache)
160
161 except Exception:
162 # Fallback for errors
163 for pid in uncached_pids:
164 results[pid] = {"analysis failed"}
165
166 # Ensure all PIDs have annotations
167 for pid in pids:
168 if pid not in results:
169 results[pid] = {"no analysis available"}
170
171 return results
172
173
174class ProcessList(Static):
175 def compose(self) -> ComposeResult:
176 self.list_view = ListView()
177 yield self.list_view
178
179 def update_processes(self, pattern: str = ""):
180 self._pids = []
181 self._cmds = {}
182 if not pattern:
183 self.list_view.clear()
184 self.list_view.append(
185 ListItem(Label("Enter a pattern to search for processes."))
186 )
187 return
188 if len(pattern) < MIN_PATTERN:
189 self.list_view.clear()
190 self.list_view.append(
191 ListItem(Label(f"Pattern must be at least {MIN_PATTERN} characters."))
192 )
193 return
194 try:
195 result = subprocess.run(
196 ["pgrep", "-f", pattern], capture_output=True, text=True
197 )
198 pids = [pid for pid in result.stdout.strip().split() if pid]
199 except FileNotFoundError:
200 self.list_view.clear()
201 self.list_view.append(ListItem(Label("pgrep not found")))
202 return
203 if len(pids) > MAX_RESULTS:
204 self.list_view.clear()
205 self.list_view.append(
206 ListItem(
207 Label(
208 f"Too many results ({len(pids)}). Please refine your pattern or set PROCESS_TUI_MAX_RESULTS."
209 )
210 )
211 )
212 return
213 self.list_view.clear()
214 if not pids:
215 self.list_view.append(ListItem(Label("No processes found.")))
216 return
217 self._pids = pids
218 for pid in pids:
219 pid_id = f"pid_{pid}"
220 try:
221 cmd = subprocess.run(
222 ["ps", "-p", pid, "-o", "args="], capture_output=True, text=True
223 ).stdout.strip()
224 self._cmds[pid] = cmd
225 try:
226 self.list_view.get_child_by_id(pid_id)
227 except Exception:
228 item = ListItem(
229 Label(
230 f"[b]PID:[/b] {pid}\n[dim]{cmd}[/dim]\n[italic]Annotating...[/italic]"
231 ),
232 id=pid_id,
233 )
234 self.list_view.append(item)
235 except Exception:
236 self._cmds[pid] = "(unable to get command)"
237 try:
238 self.list_view.get_child_by_id(pid_id)
239 except Exception:
240 item = ListItem(
241 Label(
242 f"[b]PID:[/b] {pid}\n[red](unable to get command)[/red]\n[italic]Annotating...[/italic]"
243 ),
244 id=pid_id,
245 )
246 self.list_view.append(item)
247 self.annotate_async(pids, self._cmds)
248
249 @work(thread=True)
250 def annotate_async(self, pids: list[str], cmds: dict[str, str]):
251 annotations = annotate_pids(pids, cmds)
252 for pid in pids:
253 pid_id = f"pid_{pid}"
254 label = f"[b]PID:[/b] {pid}\n[dim]{self._cmds[pid]}[/dim]"
255 if annotations[pid]:
256 label += "\n[yellow]" + ", ".join(annotations[pid]) + "[/yellow]"
257 else:
258 label += "\n[italic dim]No annotation[/italic dim]"
259
260 def update_label():
261 try:
262 child = self.list_view.get_child_by_id(pid_id)
263 if isinstance(child, ListItem):
264 label_widget = child.query_one(Label)
265 label_widget.update(label)
266 except Exception:
267 pass
268
269 self.app.call_from_thread(update_label)
270
271
272class ConfirmKillScreen(ModalScreen[bool]):
273 def __init__(self, pattern: str):
274 super().__init__()
275 self.pattern = pattern
276
277 def compose(self) -> ComposeResult:
278 with Container():
279 yield Label(f"Kill all processes matching '{self.pattern}'?", id="question")
280 with Horizontal():
281 yield Button("No", id="no", variant="primary")
282 yield Button("Yes", id="yes", variant="error")
283
284 def on_button_pressed(self, event: Button.Pressed) -> None:
285 self.dismiss(event.button.id == "yes")
286
287
288class ProcessTUI(App):
289 CSS = """
290 VerticalScroll {
291 padding: 1;
292 }
293 Input {
294 margin: 1;
295 }
296 ListView {
297 margin: 1;
298 }
299 #question {
300 content-align: center middle;
301 margin-bottom: 1;
302 }
303 """
304
305 def compose(self) -> ComposeResult:
306 yield Header()
307 with Vertical():
308 self.input = Input(
309 placeholder="Enter pattern to match processes (press Enter to kill)"
310 )
311 yield self.input
312 with VerticalScroll():
313 self.proc_list = ProcessList()
314 yield self.proc_list
315 yield Footer()
316
317 def on_mount(self):
318 self.proc_list.update_processes()
319
320 def on_input_submitted(self, event: Input.Submitted):
321 pattern = event.value.strip()
322 if not pattern or len(pattern) < MIN_PATTERN:
323 return
324
325 def handle_confirm(result: bool | None):
326 if result:
327 try:
328 proc_result = subprocess.run(
329 ["pgrep", "-f", pattern], capture_output=True, text=True
330 )
331 pids = [
332 int(pid) for pid in proc_result.stdout.strip().split() if pid
333 ]
334 for pid in pids:
335 try:
336 os.kill(pid, signal.SIGTERM)
337 except Exception:
338 pass
339 except Exception:
340 pass
341 self.input.disabled = False
342 self.proc_list.update_processes()
343
344 self.input.disabled = True
345 self.push_screen(ConfirmKillScreen(pattern), handle_confirm)
346
347 def on_input_changed(self, event: Input.Changed):
348 if not self.input.disabled:
349 self.proc_list.update_processes(event.value.strip())
350
351
352if __name__ == "__main__":
353 try:
354 loop = asyncio.get_event_loop()
355 except RuntimeError:
356 loop = asyncio.new_event_loop()
357 asyncio.set_event_loop(loop)
358 ProcessTUI().run()