Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1#!/usr/bin/env python3
2# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
3"""Interactive perf list."""
4
5from abc import ABC, abstractmethod
6import argparse
7from dataclasses import dataclass
8import math
9from typing import Any, Dict, Optional, Tuple
10import perf
11from textual import on
12from textual.app import App, ComposeResult
13from textual.binding import Binding
14from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll
15from textual.css.query import NoMatches
16from textual.command import SearchIcon
17from textual.screen import ModalScreen
18from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree
19from textual.widgets.tree import TreeNode
20
21
22def get_info(info: Dict[str, str], key: str):
23 return (info[key] + "\n") if key in info else ""
24
25
26class TreeValue(ABC):
27 """Abstraction for the data of value in the tree."""
28
29 @abstractmethod
30 def name(self) -> str:
31 pass
32
33 @abstractmethod
34 def description(self) -> str:
35 pass
36
37 @abstractmethod
38 def matches(self, query: str) -> bool:
39 pass
40
41 @abstractmethod
42 def parse(self) -> perf.evlist:
43 pass
44
45 @abstractmethod
46 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
47 pass
48
49
50@dataclass
51class Metric(TreeValue):
52 """A metric in the tree."""
53 metric_name: str
54 metric_pmu: str
55
56 def name(self) -> str:
57 return self.metric_name
58
59 def description(self) -> str:
60 """Find and format metric description."""
61 for metric in perf.metrics():
62 if metric["MetricName"] != self.metric_name:
63 continue
64 if self.metric_pmu and metric["PMU"] != self.metric_pmu:
65 continue
66 desc = get_info(metric, "BriefDescription")
67 desc += get_info(metric, "PublicDescription")
68 desc += get_info(metric, "MetricExpr")
69 desc += get_info(metric, "MetricThreshold")
70 return desc
71 return "description"
72
73 def matches(self, query: str) -> bool:
74 return query in self.metric_name
75
76 def parse(self) -> perf.evlist:
77 return perf.parse_metrics(self.metric_name, self.metric_pmu)
78
79 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
80 try:
81 val = evlist.compute_metric(self.metric_name, cpu, thread)
82 return 0 if math.isnan(val) else val
83 except:
84 # Be tolerant of failures to compute metrics on particular CPUs/threads.
85 return 0
86
87
88@dataclass
89class PmuEvent(TreeValue):
90 """A PMU and event within the tree."""
91 pmu: str
92 event: str
93
94 def name(self) -> str:
95 if self.event.startswith(self.pmu) or ':' in self.event:
96 return self.event
97 else:
98 return f"{self.pmu}/{self.event}/"
99
100 def description(self) -> str:
101 """Find and format event description for {pmu}/{event}/."""
102 for p in perf.pmus():
103 if p.name() != self.pmu:
104 continue
105 for info in p.events():
106 if "name" not in info or info["name"] != self.event:
107 continue
108
109 desc = get_info(info, "topic")
110 desc += get_info(info, "event_type_desc")
111 desc += get_info(info, "desc")
112 desc += get_info(info, "long_desc")
113 desc += get_info(info, "encoding_desc")
114 return desc
115 return "description"
116
117 def matches(self, query: str) -> bool:
118 return query in self.pmu or query in self.event
119
120 def parse(self) -> perf.evlist:
121 return perf.parse_events(self.name())
122
123 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
124 return evsel.read(cpu, thread).val
125
126
127class ErrorScreen(ModalScreen[bool]):
128 """Pop up dialog for errors."""
129
130 CSS = """
131 ErrorScreen {
132 align: center middle;
133 }
134 """
135
136 def __init__(self, error: str):
137 self.error = error
138 super().__init__()
139
140 def compose(self) -> ComposeResult:
141 yield Button(f"Error: {self.error}", variant="primary", id="error")
142
143 def on_button_pressed(self, event: Button.Pressed) -> None:
144 self.dismiss(True)
145
146
147class SearchScreen(ModalScreen[str]):
148 """Pop up dialog for search."""
149
150 CSS = """
151 SearchScreen Horizontal {
152 align: center middle;
153 margin-top: 1;
154 }
155 SearchScreen Input {
156 width: 1fr;
157 }
158 """
159
160 def compose(self) -> ComposeResult:
161 yield Horizontal(SearchIcon(), Input(placeholder="Event name"))
162
163 def on_input_submitted(self, event: Input.Submitted) -> None:
164 """Handle the user pressing Enter in the input field."""
165 self.dismiss(event.value)
166
167
168class Counter(HorizontalGroup):
169 """Two labels for a CPU and its counter value."""
170
171 CSS = """
172 Label {
173 gutter: 1;
174 }
175 """
176
177 def __init__(self, cpu: int) -> None:
178 self.cpu = cpu
179 super().__init__()
180
181 def compose(self) -> ComposeResult:
182 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
183 yield Label(label + " ")
184 yield Label("0", id=f"counter_{label}")
185
186
187class CounterSparkline(HorizontalGroup):
188 """A Sparkline for a performance counter."""
189
190 def __init__(self, cpu: int) -> None:
191 self.cpu = cpu
192 super().__init__()
193
194 def compose(self) -> ComposeResult:
195 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
196 yield Label(label)
197 yield Sparkline([], summary_function=max, id=f"sparkline_{label}")
198
199
200class IListApp(App):
201 TITLE = "Interactive Perf List"
202
203 BINDINGS = [
204 Binding(key="s", action="search", description="Search",
205 tooltip="Search events and PMUs"),
206 Binding(key="n", action="next", description="Next",
207 tooltip="Next search result or item"),
208 Binding(key="p", action="prev", description="Previous",
209 tooltip="Previous search result or item"),
210 Binding(key="c", action="collapse", description="Collapse",
211 tooltip="Collapse the current PMU"),
212 Binding(key="^q", action="quit", description="Quit",
213 tooltip="Quit the app"),
214 ]
215
216 CSS = """
217 /* Make the 'total' sparkline a different color. */
218 #sparkline_total > .sparkline--min-color {
219 color: $accent;
220 }
221 #sparkline_total > .sparkline--max-color {
222 color: $accent 30%;
223 }
224 /*
225 * Make the active_search initially not displayed with the text in
226 * the middle of the line.
227 */
228 #active_search {
229 display: none;
230 width: 100%;
231 text-align: center;
232 }
233 """
234
235 def __init__(self, interval: float) -> None:
236 self.interval = interval
237 self.evlist = None
238 self.selected: Optional[TreeValue] = None
239 self.search_results: list[TreeNode[TreeValue]] = []
240 self.cur_search_result: TreeNode[TreeValue] | None = None
241 super().__init__()
242
243 def expand_and_select(self, node: TreeNode[Any]) -> None:
244 """Expand select a node in the tree."""
245 if node.parent:
246 node.parent.expand()
247 if node.parent.parent:
248 node.parent.parent.expand()
249 node.expand()
250 node.tree.select_node(node)
251 node.tree.scroll_to_node(node)
252
253 def set_searched_tree_node(self, previous: bool) -> None:
254 """Set the cur_search_result node to either the next or previous."""
255 l = len(self.search_results)
256
257 if l < 1:
258 tree: Tree[TreeValue] = self.query_one("#root", Tree)
259 if previous:
260 tree.action_cursor_up()
261 else:
262 tree.action_cursor_down()
263 return
264
265 if self.cur_search_result:
266 idx = self.search_results.index(self.cur_search_result)
267 if previous:
268 idx = idx - 1 if idx > 0 else l - 1
269 else:
270 idx = idx + 1 if idx < l - 1 else 0
271 else:
272 idx = l - 1 if previous else 0
273
274 node = self.search_results[idx]
275 if node == self.cur_search_result:
276 return
277
278 self.cur_search_result = node
279 self.expand_and_select(node)
280
281 def action_search(self) -> None:
282 """Search was chosen."""
283 def set_initial_focus(event: str | None) -> None:
284 """Sets the focus after the SearchScreen is dismissed."""
285
286 search_label = self.query_one("#active_search", Label)
287 search_label.display = True if event else False
288 if not event:
289 return
290 event = event.lower()
291 search_label.update(f'Searching for events matching "{event}"')
292
293 tree: Tree[str] = self.query_one("#root", Tree)
294
295 def find_search_results(event: str, node: TreeNode[str],
296 cursor_seen: bool = False,
297 match_after_cursor: Optional[TreeNode[str]] = None
298 ) -> Tuple[bool, Optional[TreeNode[str]]]:
299 """Find nodes that match the search remembering the one after the cursor."""
300 if not cursor_seen and node == tree.cursor_node:
301 cursor_seen = True
302 if node.data and node.data.matches(event):
303 if cursor_seen and not match_after_cursor:
304 match_after_cursor = node
305 self.search_results.append(node)
306
307 if node.children:
308 for child in node.children:
309 (cursor_seen, match_after_cursor) = \
310 find_search_results(event, child, cursor_seen, match_after_cursor)
311 return (cursor_seen, match_after_cursor)
312
313 self.search_results.clear()
314 (_, self.cur_search_result) = find_search_results(event, tree.root)
315 if len(self.search_results) < 1:
316 self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}"))
317 search_label.display = False
318 elif self.cur_search_result:
319 self.expand_and_select(self.cur_search_result)
320 else:
321 self.set_searched_tree_node(previous=False)
322
323 self.push_screen(SearchScreen(), set_initial_focus)
324
325 def action_next(self) -> None:
326 """Next was chosen."""
327 self.set_searched_tree_node(previous=False)
328
329 def action_prev(self) -> None:
330 """Previous was chosen."""
331 self.set_searched_tree_node(previous=True)
332
333 def action_collapse(self) -> None:
334 """Collapse the part of the tree currently on."""
335 tree: Tree[str] = self.query_one("#root", Tree)
336 node = tree.cursor_node
337 if node and node.parent:
338 node.parent.collapse_all()
339 node.tree.scroll_to_node(node.parent)
340
341 def update_counts(self) -> None:
342 """Called every interval to update counts."""
343 if not self.selected or not self.evlist:
344 return
345
346 def update_count(cpu: int, count: int):
347 # Update the raw count display.
348 counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total")
349 if not counter:
350 return
351 counter = counter.first(Label)
352 counter.update(str(count))
353
354 # Update the sparkline.
355 line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total")
356 if not line:
357 return
358 line = line.first(Sparkline)
359 # If there are more events than the width, remove the front event.
360 if len(line.data) > line.size.width:
361 line.data.pop(0)
362 line.data.append(count)
363 line.mutate_reactive(Sparkline.data)
364
365 # Update the total and each CPU counts, assume there's just 1 evsel.
366 total = 0
367 self.evlist.disable()
368 for evsel in self.evlist:
369 for cpu in evsel.cpus():
370 aggr = 0
371 for thread in evsel.threads():
372 aggr += self.selected.value(self.evlist, evsel, cpu, thread)
373 update_count(cpu, aggr)
374 total += aggr
375 update_count(-1, total)
376 self.evlist.enable()
377
378 def on_mount(self) -> None:
379 """When App starts set up periodic event updating."""
380 self.update_counts()
381 self.set_interval(self.interval, self.update_counts)
382
383 def set_selected(self, value: TreeValue) -> None:
384 """Updates the event/description and starts the counters."""
385 try:
386 label_name = self.query_one("#event_name", Label)
387 event_description = self.query_one("#event_description", Static)
388 lines = self.query_one("#lines")
389 except NoMatches:
390 # A race with rendering, ignore the update as we can't
391 # mount the assumed output widgets.
392 return
393
394 self.selected = value
395
396 # Remove previous event information.
397 if self.evlist:
398 self.evlist.disable()
399 self.evlist.close()
400 old_lines = self.query(CounterSparkline)
401 for line in old_lines:
402 line.remove()
403 old_counters = self.query(Counter)
404 for counter in old_counters:
405 counter.remove()
406
407 # Update event/metric text and description.
408 label_name.update(value.name())
409 event_description.update(value.description())
410
411 # Open the event.
412 try:
413 self.evlist = value.parse()
414 if self.evlist:
415 self.evlist.open()
416 self.evlist.enable()
417 except:
418 self.evlist = None
419
420 if not self.evlist:
421 self.push_screen(ErrorScreen(f"Failed to open {value.name()}"))
422 return
423
424 # Add spark lines for all the CPUs. Note, must be done after
425 # open so that the evlist CPUs have been computed by propagate
426 # maps.
427 line = CounterSparkline(cpu=-1)
428 lines.mount(line)
429 for cpu in self.evlist.all_cpus():
430 line = CounterSparkline(cpu)
431 lines.mount(line)
432 line = Counter(cpu=-1)
433 lines.mount(line)
434 for cpu in self.evlist.all_cpus():
435 line = Counter(cpu)
436 lines.mount(line)
437
438 def compose(self) -> ComposeResult:
439 """Draws the app."""
440 def metric_event_tree() -> Tree:
441 """Create tree of PMUs and metricgroups with events or metrics under."""
442 tree: Tree[TreeValue] = Tree("Root", id="root")
443 pmus = tree.root.add("PMUs")
444 for pmu in perf.pmus():
445 pmu_name = pmu.name().lower()
446 pmu_node = pmus.add(pmu_name)
447 try:
448 for event in sorted(pmu.events(), key=lambda x: x["name"]):
449 if "deprecated" in event:
450 continue
451 if "name" in event:
452 e = event["name"].lower()
453 if "alias" in event:
454 pmu_node.add_leaf(f'{e} ({event["alias"]})',
455 data=PmuEvent(pmu_name, e))
456 else:
457 pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e))
458 except:
459 # Reading events may fail with EPERM, ignore.
460 pass
461 metrics = tree.root.add("Metrics")
462 groups = set()
463 for metric in perf.metrics():
464 groups.update(metric["MetricGroup"])
465
466 def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str, pmu: str = None):
467 for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]):
468 metric_pmu = metric.get('PMU')
469 if pmu and metric_pmu and metric_pmu != pmu:
470 continue
471 if parent in metric["MetricGroup"]:
472 name = metric["MetricName"]
473 display_name = name
474 if metric_pmu:
475 display_name += f" ({metric_pmu})"
476 node.add_leaf(display_name, data=Metric(name, metric_pmu))
477 child_group_name = f'{name}_group'
478 if child_group_name in groups:
479 display_child_group_name = child_group_name
480 if metric_pmu:
481 display_child_group_name += f" ({metric_pmu})"
482 add_metrics_to_tree(node.add(display_child_group_name),
483 child_group_name,
484 metric_pmu)
485
486 for group in sorted(groups):
487 if group.endswith('_group'):
488 continue
489 add_metrics_to_tree(metrics.add(group), group)
490
491 tree.root.expand()
492 return tree
493
494 yield Header(id="header")
495 yield Horizontal(Vertical(metric_event_tree(), id="events"),
496 Vertical(Label("event name", id="event_name"),
497 Static("description", markup=False, id="event_description"),
498 ))
499 yield Label(id="active_search")
500 yield VerticalScroll(id="lines")
501 yield Footer(id="footer")
502
503 @on(Tree.NodeSelected)
504 def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None:
505 """Called when a tree node is selected, selecting the event."""
506 if event.node.data:
507 self.set_selected(event.node.data)
508
509
510if __name__ == "__main__":
511 ap = argparse.ArgumentParser()
512 ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1)
513 args = ap.parse_args()
514 app = IListApp(float(args.interval))
515 app.run()