Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1#!/usr/bin/env python3
2# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
3"""Interactive perf list."""
4
5from abc import ABC, abstractmethod
6import argparse
7from dataclasses import dataclass
8import math
9from typing import Any, Dict, Optional, Tuple
10import perf
11from textual import on
12from textual.app import App, ComposeResult
13from textual.binding import Binding
14from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll
15from textual.css.query import NoMatches
16from textual.command import SearchIcon
17from textual.screen import ModalScreen
18from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree
19from textual.widgets.tree import TreeNode
20
21
22def get_info(info: Dict[str, str], key: str):
23 return (info[key] + "\n") if key in info else ""
24
25
26class TreeValue(ABC):
27 """Abstraction for the data of value in the tree."""
28
29 @abstractmethod
30 def name(self) -> str:
31 pass
32
33 @abstractmethod
34 def description(self) -> str:
35 pass
36
37 @abstractmethod
38 def matches(self, query: str) -> bool:
39 pass
40
41 @abstractmethod
42 def parse(self) -> perf.evlist:
43 pass
44
45 @abstractmethod
46 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
47 pass
48
49
50@dataclass
51class Metric(TreeValue):
52 """A metric in the tree."""
53 metric_name: str
54
55 def name(self) -> str:
56 return self.metric_name
57
58 def description(self) -> str:
59 """Find and format metric description."""
60 for metric in perf.metrics():
61 if metric["MetricName"] != self.metric_name:
62 continue
63 desc = get_info(metric, "BriefDescription")
64 desc += get_info(metric, "PublicDescription")
65 desc += get_info(metric, "MetricExpr")
66 desc += get_info(metric, "MetricThreshold")
67 return desc
68 return "description"
69
70 def matches(self, query: str) -> bool:
71 return query in self.metric_name
72
73 def parse(self) -> perf.evlist:
74 return perf.parse_metrics(self.metric_name)
75
76 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
77 val = evlist.compute_metric(self.metric_name, cpu, thread)
78 return 0 if math.isnan(val) else val
79
80
81@dataclass
82class PmuEvent(TreeValue):
83 """A PMU and event within the tree."""
84 pmu: str
85 event: str
86
87 def name(self) -> str:
88 if self.event.startswith(self.pmu) or ':' in self.event:
89 return self.event
90 else:
91 return f"{self.pmu}/{self.event}/"
92
93 def description(self) -> str:
94 """Find and format event description for {pmu}/{event}/."""
95 for p in perf.pmus():
96 if p.name() != self.pmu:
97 continue
98 for info in p.events():
99 if "name" not in info or info["name"] != self.event:
100 continue
101
102 desc = get_info(info, "topic")
103 desc += get_info(info, "event_type_desc")
104 desc += get_info(info, "desc")
105 desc += get_info(info, "long_desc")
106 desc += get_info(info, "encoding_desc")
107 return desc
108 return "description"
109
110 def matches(self, query: str) -> bool:
111 return query in self.pmu or query in self.event
112
113 def parse(self) -> perf.evlist:
114 return perf.parse_events(self.name())
115
116 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
117 return evsel.read(cpu, thread).val
118
119
120class ErrorScreen(ModalScreen[bool]):
121 """Pop up dialog for errors."""
122
123 CSS = """
124 ErrorScreen {
125 align: center middle;
126 }
127 """
128
129 def __init__(self, error: str):
130 self.error = error
131 super().__init__()
132
133 def compose(self) -> ComposeResult:
134 yield Button(f"Error: {self.error}", variant="primary", id="error")
135
136 def on_button_pressed(self, event: Button.Pressed) -> None:
137 self.dismiss(True)
138
139
140class SearchScreen(ModalScreen[str]):
141 """Pop up dialog for search."""
142
143 CSS = """
144 SearchScreen Horizontal {
145 align: center middle;
146 margin-top: 1;
147 }
148 SearchScreen Input {
149 width: 1fr;
150 }
151 """
152
153 def compose(self) -> ComposeResult:
154 yield Horizontal(SearchIcon(), Input(placeholder="Event name"))
155
156 def on_input_submitted(self, event: Input.Submitted) -> None:
157 """Handle the user pressing Enter in the input field."""
158 self.dismiss(event.value)
159
160
161class Counter(HorizontalGroup):
162 """Two labels for a CPU and its counter value."""
163
164 CSS = """
165 Label {
166 gutter: 1;
167 }
168 """
169
170 def __init__(self, cpu: int) -> None:
171 self.cpu = cpu
172 super().__init__()
173
174 def compose(self) -> ComposeResult:
175 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
176 yield Label(label + " ")
177 yield Label("0", id=f"counter_{label}")
178
179
180class CounterSparkline(HorizontalGroup):
181 """A Sparkline for a performance counter."""
182
183 def __init__(self, cpu: int) -> None:
184 self.cpu = cpu
185 super().__init__()
186
187 def compose(self) -> ComposeResult:
188 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
189 yield Label(label)
190 yield Sparkline([], summary_function=max, id=f"sparkline_{label}")
191
192
193class IListApp(App):
194 TITLE = "Interactive Perf List"
195
196 BINDINGS = [
197 Binding(key="s", action="search", description="Search",
198 tooltip="Search events and PMUs"),
199 Binding(key="n", action="next", description="Next",
200 tooltip="Next search result or item"),
201 Binding(key="p", action="prev", description="Previous",
202 tooltip="Previous search result or item"),
203 Binding(key="c", action="collapse", description="Collapse",
204 tooltip="Collapse the current PMU"),
205 Binding(key="^q", action="quit", description="Quit",
206 tooltip="Quit the app"),
207 ]
208
209 CSS = """
210 /* Make the 'total' sparkline a different color. */
211 #sparkline_total > .sparkline--min-color {
212 color: $accent;
213 }
214 #sparkline_total > .sparkline--max-color {
215 color: $accent 30%;
216 }
217 /*
218 * Make the active_search initially not displayed with the text in
219 * the middle of the line.
220 */
221 #active_search {
222 display: none;
223 width: 100%;
224 text-align: center;
225 }
226 """
227
228 def __init__(self, interval: float) -> None:
229 self.interval = interval
230 self.evlist = None
231 self.selected: Optional[TreeValue] = None
232 self.search_results: list[TreeNode[TreeValue]] = []
233 self.cur_search_result: TreeNode[TreeValue] | None = None
234 super().__init__()
235
236 def expand_and_select(self, node: TreeNode[Any]) -> None:
237 """Expand select a node in the tree."""
238 if node.parent:
239 node.parent.expand()
240 if node.parent.parent:
241 node.parent.parent.expand()
242 node.expand()
243 node.tree.select_node(node)
244 node.tree.scroll_to_node(node)
245
246 def set_searched_tree_node(self, previous: bool) -> None:
247 """Set the cur_search_result node to either the next or previous."""
248 l = len(self.search_results)
249
250 if l < 1:
251 tree: Tree[TreeValue] = self.query_one("#root", Tree)
252 if previous:
253 tree.action_cursor_up()
254 else:
255 tree.action_cursor_down()
256 return
257
258 if self.cur_search_result:
259 idx = self.search_results.index(self.cur_search_result)
260 if previous:
261 idx = idx - 1 if idx > 0 else l - 1
262 else:
263 idx = idx + 1 if idx < l - 1 else 0
264 else:
265 idx = l - 1 if previous else 0
266
267 node = self.search_results[idx]
268 if node == self.cur_search_result:
269 return
270
271 self.cur_search_result = node
272 self.expand_and_select(node)
273
274 def action_search(self) -> None:
275 """Search was chosen."""
276 def set_initial_focus(event: str | None) -> None:
277 """Sets the focus after the SearchScreen is dismissed."""
278
279 search_label = self.query_one("#active_search", Label)
280 search_label.display = True if event else False
281 if not event:
282 return
283 event = event.lower()
284 search_label.update(f'Searching for events matching "{event}"')
285
286 tree: Tree[str] = self.query_one("#root", Tree)
287
288 def find_search_results(event: str, node: TreeNode[str],
289 cursor_seen: bool = False,
290 match_after_cursor: Optional[TreeNode[str]] = None
291 ) -> Tuple[bool, Optional[TreeNode[str]]]:
292 """Find nodes that match the search remembering the one after the cursor."""
293 if not cursor_seen and node == tree.cursor_node:
294 cursor_seen = True
295 if node.data and node.data.matches(event):
296 if cursor_seen and not match_after_cursor:
297 match_after_cursor = node
298 self.search_results.append(node)
299
300 if node.children:
301 for child in node.children:
302 (cursor_seen, match_after_cursor) = \
303 find_search_results(event, child, cursor_seen, match_after_cursor)
304 return (cursor_seen, match_after_cursor)
305
306 self.search_results.clear()
307 (_, self.cur_search_result) = find_search_results(event, tree.root)
308 if len(self.search_results) < 1:
309 self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}"))
310 search_label.display = False
311 elif self.cur_search_result:
312 self.expand_and_select(self.cur_search_result)
313 else:
314 self.set_searched_tree_node(previous=False)
315
316 self.push_screen(SearchScreen(), set_initial_focus)
317
318 def action_next(self) -> None:
319 """Next was chosen."""
320 self.set_searched_tree_node(previous=False)
321
322 def action_prev(self) -> None:
323 """Previous was chosen."""
324 self.set_searched_tree_node(previous=True)
325
326 def action_collapse(self) -> None:
327 """Collapse the part of the tree currently on."""
328 tree: Tree[str] = self.query_one("#root", Tree)
329 node = tree.cursor_node
330 if node and node.parent:
331 node.parent.collapse_all()
332 node.tree.scroll_to_node(node.parent)
333
334 def update_counts(self) -> None:
335 """Called every interval to update counts."""
336 if not self.selected or not self.evlist:
337 return
338
339 def update_count(cpu: int, count: int):
340 # Update the raw count display.
341 counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total")
342 if not counter:
343 return
344 counter = counter.first(Label)
345 counter.update(str(count))
346
347 # Update the sparkline.
348 line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total")
349 if not line:
350 return
351 line = line.first(Sparkline)
352 # If there are more events than the width, remove the front event.
353 if len(line.data) > line.size.width:
354 line.data.pop(0)
355 line.data.append(count)
356 line.mutate_reactive(Sparkline.data)
357
358 # Update the total and each CPU counts, assume there's just 1 evsel.
359 total = 0
360 self.evlist.disable()
361 for evsel in self.evlist:
362 for cpu in evsel.cpus():
363 aggr = 0
364 for thread in evsel.threads():
365 aggr += self.selected.value(self.evlist, evsel, cpu, thread)
366 update_count(cpu, aggr)
367 total += aggr
368 update_count(-1, total)
369 self.evlist.enable()
370
371 def on_mount(self) -> None:
372 """When App starts set up periodic event updating."""
373 self.update_counts()
374 self.set_interval(self.interval, self.update_counts)
375
376 def set_selected(self, value: TreeValue) -> None:
377 """Updates the event/description and starts the counters."""
378 try:
379 label_name = self.query_one("#event_name", Label)
380 event_description = self.query_one("#event_description", Static)
381 lines = self.query_one("#lines")
382 except NoMatches:
383 # A race with rendering, ignore the update as we can't
384 # mount the assumed output widgets.
385 return
386
387 self.selected = value
388
389 # Remove previous event information.
390 if self.evlist:
391 self.evlist.disable()
392 self.evlist.close()
393 old_lines = self.query(CounterSparkline)
394 for line in old_lines:
395 line.remove()
396 old_counters = self.query(Counter)
397 for counter in old_counters:
398 counter.remove()
399
400 # Update event/metric text and description.
401 label_name.update(value.name())
402 event_description.update(value.description())
403
404 # Open the event.
405 try:
406 self.evlist = value.parse()
407 if self.evlist:
408 self.evlist.open()
409 self.evlist.enable()
410 except:
411 self.evlist = None
412
413 if not self.evlist:
414 self.push_screen(ErrorScreen(f"Failed to open {value.name()}"))
415 return
416
417 # Add spark lines for all the CPUs. Note, must be done after
418 # open so that the evlist CPUs have been computed by propagate
419 # maps.
420 line = CounterSparkline(cpu=-1)
421 lines.mount(line)
422 for cpu in self.evlist.all_cpus():
423 line = CounterSparkline(cpu)
424 lines.mount(line)
425 line = Counter(cpu=-1)
426 lines.mount(line)
427 for cpu in self.evlist.all_cpus():
428 line = Counter(cpu)
429 lines.mount(line)
430
431 def compose(self) -> ComposeResult:
432 """Draws the app."""
433 def metric_event_tree() -> Tree:
434 """Create tree of PMUs and metricgroups with events or metrics under."""
435 tree: Tree[TreeValue] = Tree("Root", id="root")
436 pmus = tree.root.add("PMUs")
437 for pmu in perf.pmus():
438 pmu_name = pmu.name().lower()
439 pmu_node = pmus.add(pmu_name)
440 try:
441 for event in sorted(pmu.events(), key=lambda x: x["name"]):
442 if "name" in event:
443 e = event["name"].lower()
444 if "alias" in event:
445 pmu_node.add_leaf(f'{e} ({event["alias"]})',
446 data=PmuEvent(pmu_name, e))
447 else:
448 pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e))
449 except:
450 # Reading events may fail with EPERM, ignore.
451 pass
452 metrics = tree.root.add("Metrics")
453 groups = set()
454 for metric in perf.metrics():
455 groups.update(metric["MetricGroup"])
456
457 def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str):
458 for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]):
459 if parent in metric["MetricGroup"]:
460 name = metric["MetricName"]
461 node.add_leaf(name, data=Metric(name))
462 child_group_name = f'{name}_group'
463 if child_group_name in groups:
464 add_metrics_to_tree(node.add(child_group_name), child_group_name)
465
466 for group in sorted(groups):
467 if group.endswith('_group'):
468 continue
469 add_metrics_to_tree(metrics.add(group), group)
470
471 tree.root.expand()
472 return tree
473
474 yield Header(id="header")
475 yield Horizontal(Vertical(metric_event_tree(), id="events"),
476 Vertical(Label("event name", id="event_name"),
477 Static("description", markup=False, id="event_description"),
478 ))
479 yield Label(id="active_search")
480 yield VerticalScroll(id="lines")
481 yield Footer(id="footer")
482
483 @on(Tree.NodeSelected)
484 def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None:
485 """Called when a tree node is selected, selecting the event."""
486 if event.node.data:
487 self.set_selected(event.node.data)
488
489
490if __name__ == "__main__":
491 ap = argparse.ArgumentParser()
492 ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1)
493 args = ap.parse_args()
494 app = IListApp(float(args.interval))
495 app.run()