Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3#
4# Run a perf script command multiple times in parallel, using perf script
5# options --cpu and --time so that each job processes a different chunk
6# of the data.
7#
8# Copyright (c) 2024, Intel Corporation.
9
10import subprocess
11import argparse
12import pathlib
13import shlex
14import time
15import copy
16import sys
17import os
18import re
19
20glb_prog_name = "parallel-perf.py"
21glb_min_interval = 10.0
22glb_min_samples = 64
23
24class Verbosity():
25
26 def __init__(self, quiet=False, verbose=False, debug=False):
27 self.normal = True
28 self.verbose = verbose
29 self.debug = debug
30 self.self_test = True
31 if self.debug:
32 self.verbose = True
33 if self.verbose:
34 quiet = False
35 if quiet:
36 self.normal = False
37
38# Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command
39class Work():
40
41 def __init__(self, cmd, pipe_to, output_dir="."):
42 self.popen = None
43 self.consumer = None
44 self.cmd = cmd
45 self.pipe_to = pipe_to
46 self.output_dir = output_dir
47 self.cmdout_name = f"{output_dir}/cmd.txt"
48 self.stdout_name = f"{output_dir}/out.txt"
49 self.stderr_name = f"{output_dir}/err.txt"
50
51 def Command(self):
52 sh_cmd = [ shlex.quote(x) for x in self.cmd ]
53 return " ".join(self.cmd)
54
55 def Stdout(self):
56 return open(self.stdout_name, "w")
57
58 def Stderr(self):
59 return open(self.stderr_name, "w")
60
61 def CreateOutputDir(self):
62 pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True)
63
64 def Start(self):
65 if self.popen:
66 return
67 self.CreateOutputDir()
68 with open(self.cmdout_name, "w") as f:
69 f.write(self.Command())
70 f.write("\n")
71 stdout = self.Stdout()
72 stderr = self.Stderr()
73 if self.pipe_to:
74 self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr)
75 args = shlex.split(self.pipe_to)
76 self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr)
77 else:
78 self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr)
79
80 def RemoveEmptyErrFile(self):
81 if os.path.exists(self.stderr_name):
82 if os.path.getsize(self.stderr_name) == 0:
83 os.unlink(self.stderr_name)
84
85 def Errors(self):
86 if os.path.exists(self.stderr_name):
87 if os.path.getsize(self.stderr_name) != 0:
88 return [ f"Non-empty error file {self.stderr_name}" ]
89 return []
90
91 def TidyUp(self):
92 self.RemoveEmptyErrFile()
93
94 def RawPollWait(self, p, wait):
95 if wait:
96 return p.wait()
97 return p.poll()
98
99 def Poll(self, wait=False):
100 if not self.popen:
101 return None
102 result = self.RawPollWait(self.popen, wait)
103 if self.consumer:
104 res = result
105 result = self.RawPollWait(self.consumer, wait)
106 if result != None and res == None:
107 self.popen.kill()
108 result = None
109 elif result == 0 and res != None and res != 0:
110 result = res
111 if result != None:
112 self.TidyUp()
113 return result
114
115 def Wait(self):
116 return self.Poll(wait=True)
117
118 def Kill(self):
119 if not self.popen:
120 return
121 self.popen.kill()
122 if self.consumer:
123 self.consumer.kill()
124
125def KillWork(worklist, verbosity):
126 for w in worklist:
127 w.Kill()
128 for w in worklist:
129 w.Wait()
130
131def NumberOfCPUs():
132 return os.sysconf("SC_NPROCESSORS_ONLN")
133
134def NanoSecsToSecsStr(x):
135 if x == None:
136 return ""
137 x = str(x)
138 if len(x) < 10:
139 x = "0" * (10 - len(x)) + x
140 return x[:len(x) - 9] + "." + x[-9:]
141
142def InsertOptionAfter(cmd, option, after):
143 try:
144 pos = cmd.index(after)
145 cmd.insert(pos + 1, option)
146 except:
147 cmd.append(option)
148
149def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu):
150 max_len = len(str(cpus[-1]))
151 cpu_dir_fmt = f"cpu-%.{max_len}u"
152 worklist = []
153 pos = 0
154 for cpu in cpus:
155 if cpu >= 0:
156 cpu_dir = os.path.join(output_dir, cpu_dir_fmt % cpu)
157 cpu_option = f"--cpu={cpu}"
158 else:
159 cpu_dir = output_dir
160 cpu_option = None
161
162 tr_dir_fmt = "time-range"
163
164 if len(time_ranges_by_cpu) > 1:
165 time_ranges = time_ranges_by_cpu[pos]
166 tr_dir_fmt += f"-{pos}"
167 pos += 1
168 else:
169 time_ranges = time_ranges_by_cpu[0]
170
171 max_len = len(str(len(time_ranges)))
172 tr_dir_fmt += f"-%.{max_len}u"
173
174 i = 0
175 for r in time_ranges:
176 if r == [None, None]:
177 time_option = None
178 work_output_dir = cpu_dir
179 else:
180 time_option = "--time=" + NanoSecsToSecsStr(r[0]) + "," + NanoSecsToSecsStr(r[1])
181 work_output_dir = os.path.join(cpu_dir, tr_dir_fmt % i)
182 i += 1
183 work_cmd = list(cmd)
184 if time_option != None:
185 InsertOptionAfter(work_cmd, time_option, "script")
186 if cpu_option != None:
187 InsertOptionAfter(work_cmd, cpu_option, "script")
188 w = Work(work_cmd, pipe_to, work_output_dir)
189 worklist.append(w)
190 return worklist
191
192def DoRunWork(worklist, nr_jobs, verbosity):
193 nr_to_do = len(worklist)
194 not_started = list(worklist)
195 running = []
196 done = []
197 chg = False
198 while True:
199 nr_done = len(done)
200 if chg and verbosity.normal:
201 nr_run = len(running)
202 print(f"\rThere are {nr_to_do} jobs: {nr_done} completed, {nr_run} running", flush=True, end=" ")
203 if verbosity.verbose:
204 print()
205 chg = False
206 if nr_done == nr_to_do:
207 break
208 while len(running) < nr_jobs and len(not_started):
209 w = not_started.pop(0)
210 running.append(w)
211 if verbosity.verbose:
212 print("Starting:", w.Command())
213 w.Start()
214 chg = True
215 if len(running):
216 time.sleep(0.1)
217 finished = []
218 not_finished = []
219 while len(running):
220 w = running.pop(0)
221 r = w.Poll()
222 if r == None:
223 not_finished.append(w)
224 continue
225 if r == 0:
226 if verbosity.verbose:
227 print("Finished:", w.Command())
228 finished.append(w)
229 chg = True
230 continue
231 if verbosity.normal and not verbosity.verbose:
232 print()
233 print("Job failed!\n return code:", r, "\n command: ", w.Command())
234 if w.pipe_to:
235 print(" piped to: ", w.pipe_to)
236 print("Killing outstanding jobs")
237 KillWork(not_finished, verbosity)
238 KillWork(running, verbosity)
239 return False
240 running = not_finished
241 done += finished
242 errorlist = []
243 for w in worklist:
244 errorlist += w.Errors()
245 if len(errorlist):
246 print("Errors:")
247 for e in errorlist:
248 print(e)
249 elif verbosity.normal:
250 print("\r"," "*50, "\rAll jobs finished successfully", flush=True)
251 return True
252
253def RunWork(worklist, nr_jobs=NumberOfCPUs(), verbosity=Verbosity()):
254 try:
255 return DoRunWork(worklist, nr_jobs, verbosity)
256 except:
257 for w in worklist:
258 w.Kill()
259 raise
260 return True
261
262def ReadHeader(perf, file_name):
263 return subprocess.Popen([perf, "script", "--header-only", "--input", file_name], stdout=subprocess.PIPE).stdout.read().decode("utf-8")
264
265def ParseHeader(hdr):
266 result = {}
267 lines = hdr.split("\n")
268 for line in lines:
269 if ":" in line and line[0] == "#":
270 pos = line.index(":")
271 name = line[1:pos-1].strip()
272 value = line[pos+1:].strip()
273 if name in result:
274 orig_name = name
275 nr = 2
276 while True:
277 name = f"{orig_name} {nr}"
278 if name not in result:
279 break
280 nr += 1
281 result[name] = value
282 return result
283
284def HeaderField(hdr_dict, hdr_fld):
285 if hdr_fld not in hdr_dict:
286 raise Exception(f"'{hdr_fld}' missing from header information")
287 return hdr_dict[hdr_fld]
288
289# Represent the position of an option within a command string
290# and provide the option value and/or remove the option
291class OptPos():
292
293 def Init(self, opt_element=-1, value_element=-1, opt_pos=-1, value_pos=-1, error=None):
294 self.opt_element = opt_element # list element that contains option
295 self.value_element = value_element # list element that contains option value
296 self.opt_pos = opt_pos # string position of option
297 self.value_pos = value_pos # string position of value
298 self.error = error # error message string
299
300 def __init__(self, args, short_name, long_name, default=None):
301 self.args = list(args)
302 self.default = default
303 n = 2 + len(long_name)
304 m = len(short_name)
305 pos = -1
306 for opt in args:
307 pos += 1
308 if m and opt[:2] == f"-{short_name}":
309 if len(opt) == 2:
310 if pos + 1 < len(args):
311 self.Init(pos, pos + 1, 0, 0)
312 else:
313 self.Init(error = f"-{short_name} option missing value")
314 else:
315 self.Init(pos, pos, 0, 2)
316 return
317 if opt[:n] == f"--{long_name}":
318 if len(opt) == n:
319 if pos + 1 < len(args):
320 self.Init(pos, pos + 1, 0, 0)
321 else:
322 self.Init(error = f"--{long_name} option missing value")
323 elif opt[n] == "=":
324 self.Init(pos, pos, 0, n + 1)
325 else:
326 self.Init(error = f"--{long_name} option expected '='")
327 return
328 if m and opt[:1] == "-" and opt[:2] != "--" and short_name in opt:
329 ipos = opt.index(short_name)
330 if "-" in opt[1:]:
331 hpos = opt[1:].index("-")
332 if hpos < ipos:
333 continue
334 if ipos + 1 == len(opt):
335 if pos + 1 < len(args):
336 self.Init(pos, pos + 1, ipos, 0)
337 else:
338 self.Init(error = f"-{short_name} option missing value")
339 else:
340 self.Init(pos, pos, ipos, ipos + 1)
341 return
342 self.Init()
343
344 def Value(self):
345 if self.opt_element >= 0:
346 if self.opt_element != self.value_element:
347 return self.args[self.value_element]
348 else:
349 return self.args[self.value_element][self.value_pos:]
350 return self.default
351
352 def Remove(self, args):
353 if self.opt_element == -1:
354 return
355 if self.opt_element != self.value_element:
356 del args[self.value_element]
357 if self.opt_pos:
358 args[self.opt_element] = args[self.opt_element][:self.opt_pos]
359 else:
360 del args[self.opt_element]
361
362def DetermineInputFileName(cmd):
363 p = OptPos(cmd, "i", "input", "perf.data")
364 if p.error:
365 raise Exception(f"perf command {p.error}")
366 file_name = p.Value()
367 if not os.path.exists(file_name):
368 raise Exception(f"perf command input file '{file_name}' not found")
369 return file_name
370
371def ReadOption(args, short_name, long_name, err_prefix, remove=False):
372 p = OptPos(args, short_name, long_name)
373 if p.error:
374 raise Exception(f"{err_prefix}{p.error}")
375 value = p.Value()
376 if remove:
377 p.Remove(args)
378 return value
379
380def ExtractOption(args, short_name, long_name, err_prefix):
381 return ReadOption(args, short_name, long_name, err_prefix, True)
382
383def ReadPerfOption(args, short_name, long_name):
384 return ReadOption(args, short_name, long_name, "perf command ")
385
386def ExtractPerfOption(args, short_name, long_name):
387 return ExtractOption(args, short_name, long_name, "perf command ")
388
389def PerfDoubleQuickCommands(cmd, file_name):
390 cpu_str = ReadPerfOption(cmd, "C", "cpu")
391 time_str = ReadPerfOption(cmd, "", "time")
392 # Use double-quick sampling to determine trace data density
393 times_cmd = ["perf", "script", "--ns", "--input", file_name, "--itrace=qqi"]
394 if cpu_str != None and cpu_str != "":
395 times_cmd.append(f"--cpu={cpu_str}")
396 if time_str != None and time_str != "":
397 times_cmd.append(f"--time={time_str}")
398 cnts_cmd = list(times_cmd)
399 cnts_cmd.append("-Fcpu")
400 times_cmd.append("-Fcpu,time")
401 return cnts_cmd, times_cmd
402
403class CPUTimeRange():
404 def __init__(self, cpu):
405 self.cpu = cpu
406 self.sample_cnt = 0
407 self.time_ranges = None
408 self.interval = 0
409 self.interval_remaining = 0
410 self.remaining = 0
411 self.tr_pos = 0
412
413def CalcTimeRangesByCPU(line, cpu, cpu_time_ranges, max_time):
414 cpu_time_range = cpu_time_ranges[cpu]
415 cpu_time_range.remaining -= 1
416 cpu_time_range.interval_remaining -= 1
417 if cpu_time_range.remaining == 0:
418 cpu_time_range.time_ranges[cpu_time_range.tr_pos][1] = max_time
419 return
420 if cpu_time_range.interval_remaining == 0:
421 time = TimeVal(line[1][:-1], 0)
422 time_ranges = cpu_time_range.time_ranges
423 time_ranges[cpu_time_range.tr_pos][1] = time - 1
424 time_ranges.append([time, max_time])
425 cpu_time_range.tr_pos += 1
426 cpu_time_range.interval_remaining = cpu_time_range.interval
427
428def CountSamplesByCPU(line, cpu, cpu_time_ranges):
429 try:
430 cpu_time_ranges[cpu].sample_cnt += 1
431 except:
432 print("exception")
433 print("cpu", cpu)
434 print("len(cpu_time_ranges)", len(cpu_time_ranges))
435 raise
436
437def ProcessCommandOutputLines(cmd, per_cpu, fn, *x):
438 # Assume CPU number is at beginning of line and enclosed by []
439 pat = re.compile(r"\s*\[[0-9]+\]")
440 p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
441 while True:
442 if line := p.stdout.readline():
443 line = line.decode("utf-8")
444 if pat.match(line):
445 line = line.split()
446 if per_cpu:
447 # Assumes CPU number is enclosed by []
448 cpu = int(line[0][1:-1])
449 else:
450 cpu = 0
451 fn(line, cpu, *x)
452 else:
453 break
454 p.wait()
455
456def IntersectTimeRanges(new_time_ranges, time_ranges):
457 pos = 0
458 new_pos = 0
459 # Can assume len(time_ranges) != 0 and len(new_time_ranges) != 0
460 # Note also, there *must* be at least one intersection.
461 while pos < len(time_ranges) and new_pos < len(new_time_ranges):
462 # new end < old start => no intersection, remove new
463 if new_time_ranges[new_pos][1] < time_ranges[pos][0]:
464 del new_time_ranges[new_pos]
465 continue
466 # new start > old end => no intersection, check next
467 if new_time_ranges[new_pos][0] > time_ranges[pos][1]:
468 pos += 1
469 if pos < len(time_ranges):
470 continue
471 # no next, so remove remaining
472 while new_pos < len(new_time_ranges):
473 del new_time_ranges[new_pos]
474 return
475 # Found an intersection
476 # new start < old start => adjust new start = old start
477 if new_time_ranges[new_pos][0] < time_ranges[pos][0]:
478 new_time_ranges[new_pos][0] = time_ranges[pos][0]
479 # new end > old end => keep the overlap, insert the remainder
480 if new_time_ranges[new_pos][1] > time_ranges[pos][1]:
481 r = [ time_ranges[pos][1] + 1, new_time_ranges[new_pos][1] ]
482 new_time_ranges[new_pos][1] = time_ranges[pos][1]
483 new_pos += 1
484 new_time_ranges.insert(new_pos, r)
485 continue
486 # new [start, end] is within old [start, end]
487 new_pos += 1
488
489def SplitTimeRangesByTraceDataDensity(time_ranges, cpus, nr, cmd, file_name, per_cpu, min_size, min_interval, verbosity):
490 if verbosity.normal:
491 print("\rAnalyzing...", flush=True, end=" ")
492 if verbosity.verbose:
493 print()
494 cnts_cmd, times_cmd = PerfDoubleQuickCommands(cmd, file_name)
495
496 nr_cpus = cpus[-1] + 1 if per_cpu else 1
497 if per_cpu:
498 nr_cpus = cpus[-1] + 1
499 cpu_time_ranges = [ CPUTimeRange(cpu) for cpu in range(nr_cpus) ]
500 else:
501 nr_cpus = 1
502 cpu_time_ranges = [ CPUTimeRange(-1) ]
503
504 if verbosity.debug:
505 print("nr_cpus", nr_cpus)
506 print("cnts_cmd", cnts_cmd)
507 print("times_cmd", times_cmd)
508
509 # Count the number of "double quick" samples per CPU
510 ProcessCommandOutputLines(cnts_cmd, per_cpu, CountSamplesByCPU, cpu_time_ranges)
511
512 tot = 0
513 mx = 0
514 for cpu_time_range in cpu_time_ranges:
515 cnt = cpu_time_range.sample_cnt
516 tot += cnt
517 if cnt > mx:
518 mx = cnt
519 if verbosity.debug:
520 print("cpu:", cpu_time_range.cpu, "sample_cnt", cnt)
521
522 if min_size < 1:
523 min_size = 1
524
525 if mx < min_size:
526 # Too little data to be worth splitting
527 if verbosity.debug:
528 print("Too little data to split by time")
529 if nr == 0:
530 nr = 1
531 return [ SplitTimeRangesIntoN(time_ranges, nr, min_interval) ]
532
533 if nr:
534 divisor = nr
535 min_size = 1
536 else:
537 divisor = NumberOfCPUs()
538
539 interval = int(round(tot / divisor, 0))
540 if interval < min_size:
541 interval = min_size
542
543 if verbosity.debug:
544 print("divisor", divisor)
545 print("min_size", min_size)
546 print("interval", interval)
547
548 min_time = time_ranges[0][0]
549 max_time = time_ranges[-1][1]
550
551 for cpu_time_range in cpu_time_ranges:
552 cnt = cpu_time_range.sample_cnt
553 if cnt == 0:
554 cpu_time_range.time_ranges = copy.deepcopy(time_ranges)
555 continue
556 # Adjust target interval for CPU to give approximately equal interval sizes
557 # Determine number of intervals, rounding to nearest integer
558 n = int(round(cnt / interval, 0))
559 if n < 1:
560 n = 1
561 # Determine interval size, rounding up
562 d, m = divmod(cnt, n)
563 if m:
564 d += 1
565 cpu_time_range.interval = d
566 cpu_time_range.interval_remaining = d
567 cpu_time_range.remaining = cnt
568 # Init. time ranges for each CPU with the start time
569 cpu_time_range.time_ranges = [ [min_time, max_time] ]
570
571 # Set time ranges so that the same number of "double quick" samples
572 # will fall into each time range.
573 ProcessCommandOutputLines(times_cmd, per_cpu, CalcTimeRangesByCPU, cpu_time_ranges, max_time)
574
575 for cpu_time_range in cpu_time_ranges:
576 if cpu_time_range.sample_cnt:
577 IntersectTimeRanges(cpu_time_range.time_ranges, time_ranges)
578
579 return [cpu_time_ranges[cpu].time_ranges for cpu in cpus]
580
581def SplitSingleTimeRangeIntoN(time_range, n):
582 if n <= 1:
583 return [time_range]
584 start = time_range[0]
585 end = time_range[1]
586 duration = int((end - start + 1) / n)
587 if duration < 1:
588 return [time_range]
589 time_ranges = []
590 for i in range(n):
591 time_ranges.append([start, start + duration - 1])
592 start += duration
593 time_ranges[-1][1] = end
594 return time_ranges
595
596def TimeRangeDuration(r):
597 return r[1] - r[0] + 1
598
599def TotalDuration(time_ranges):
600 duration = 0
601 for r in time_ranges:
602 duration += TimeRangeDuration(r)
603 return duration
604
605def SplitTimeRangesByInterval(time_ranges, interval):
606 new_ranges = []
607 for r in time_ranges:
608 duration = TimeRangeDuration(r)
609 n = duration / interval
610 n = int(round(n, 0))
611 new_ranges += SplitSingleTimeRangeIntoN(r, n)
612 return new_ranges
613
614def SplitTimeRangesIntoN(time_ranges, n, min_interval):
615 if n <= len(time_ranges):
616 return time_ranges
617 duration = TotalDuration(time_ranges)
618 interval = duration / n
619 if interval < min_interval:
620 interval = min_interval
621 return SplitTimeRangesByInterval(time_ranges, interval)
622
623def RecombineTimeRanges(tr):
624 new_tr = copy.deepcopy(tr)
625 n = len(new_tr)
626 i = 1
627 while i < len(new_tr):
628 # if prev end + 1 == cur start, combine them
629 if new_tr[i - 1][1] + 1 == new_tr[i][0]:
630 new_tr[i][0] = new_tr[i - 1][0]
631 del new_tr[i - 1]
632 else:
633 i += 1
634 return new_tr
635
636def OpenTimeRangeEnds(time_ranges, min_time, max_time):
637 if time_ranges[0][0] <= min_time:
638 time_ranges[0][0] = None
639 if time_ranges[-1][1] >= max_time:
640 time_ranges[-1][1] = None
641
642def BadTimeStr(time_str):
643 raise Exception(f"perf command bad time option: '{time_str}'\nCheck also 'time of first sample' and 'time of last sample' in perf script --header-only")
644
645def ValidateTimeRanges(time_ranges, time_str):
646 n = len(time_ranges)
647 for i in range(n):
648 start = time_ranges[i][0]
649 end = time_ranges[i][1]
650 if i != 0 and start <= time_ranges[i - 1][1]:
651 BadTimeStr(time_str)
652 if start > end:
653 BadTimeStr(time_str)
654
655def TimeVal(s, dflt):
656 s = s.strip()
657 if s == "":
658 return dflt
659 a = s.split(".")
660 if len(a) > 2:
661 raise Exception(f"Bad time value'{s}'")
662 x = int(a[0])
663 if x < 0:
664 raise Exception("Negative time not allowed")
665 x *= 1000000000
666 if len(a) > 1:
667 x += int((a[1] + "000000000")[:9])
668 return x
669
670def BadCPUStr(cpu_str):
671 raise Exception(f"perf command bad cpu option: '{cpu_str}'\nCheck also 'nrcpus avail' in perf script --header-only")
672
673def ParseTimeStr(time_str, min_time, max_time):
674 if time_str == None or time_str == "":
675 return [[min_time, max_time]]
676 time_ranges = []
677 for r in time_str.split():
678 a = r.split(",")
679 if len(a) != 2:
680 BadTimeStr(time_str)
681 try:
682 start = TimeVal(a[0], min_time)
683 end = TimeVal(a[1], max_time)
684 except:
685 BadTimeStr(time_str)
686 time_ranges.append([start, end])
687 ValidateTimeRanges(time_ranges, time_str)
688 return time_ranges
689
690def ParseCPUStr(cpu_str, nr_cpus):
691 if cpu_str == None or cpu_str == "":
692 return [-1]
693 cpus = []
694 for r in cpu_str.split(","):
695 a = r.split("-")
696 if len(a) < 1 or len(a) > 2:
697 BadCPUStr(cpu_str)
698 try:
699 start = int(a[0].strip())
700 if len(a) > 1:
701 end = int(a[1].strip())
702 else:
703 end = start
704 except:
705 BadCPUStr(cpu_str)
706 if start < 0 or end < 0 or end < start or end >= nr_cpus:
707 BadCPUStr(cpu_str)
708 cpus.extend(range(start, end + 1))
709 cpus = list(set(cpus)) # Remove duplicates
710 cpus.sort()
711 return cpus
712
713class ParallelPerf():
714
715 def __init__(self, a):
716 for arg_name in vars(a):
717 setattr(self, arg_name, getattr(a, arg_name))
718 self.orig_nr = self.nr
719 self.orig_cmd = list(self.cmd)
720 self.perf = self.cmd[0]
721 if os.path.exists(self.output_dir):
722 raise Exception(f"Output '{self.output_dir}' already exists")
723 if self.jobs < 0 or self.nr < 0 or self.interval < 0:
724 raise Exception("Bad options (negative values): try -h option for help")
725 if self.nr != 0 and self.interval != 0:
726 raise Exception("Cannot specify number of time subdivisions and time interval")
727 if self.jobs == 0:
728 self.jobs = NumberOfCPUs()
729 if self.nr == 0 and self.interval == 0:
730 if self.per_cpu:
731 self.nr = 1
732 else:
733 self.nr = self.jobs
734
735 def Init(self):
736 if self.verbosity.debug:
737 print("cmd", self.cmd)
738 self.file_name = DetermineInputFileName(self.cmd)
739 self.hdr = ReadHeader(self.perf, self.file_name)
740 self.hdr_dict = ParseHeader(self.hdr)
741 self.cmd_line = HeaderField(self.hdr_dict, "cmdline")
742
743 def ExtractTimeInfo(self):
744 self.min_time = TimeVal(HeaderField(self.hdr_dict, "time of first sample"), 0)
745 self.max_time = TimeVal(HeaderField(self.hdr_dict, "time of last sample"), 0)
746 self.time_str = ExtractPerfOption(self.cmd, "", "time")
747 self.time_ranges = ParseTimeStr(self.time_str, self.min_time, self.max_time)
748 if self.verbosity.debug:
749 print("time_ranges", self.time_ranges)
750
751 def ExtractCPUInfo(self):
752 if self.per_cpu:
753 nr_cpus = int(HeaderField(self.hdr_dict, "nrcpus avail"))
754 self.cpu_str = ExtractPerfOption(self.cmd, "C", "cpu")
755 if self.cpu_str == None or self.cpu_str == "":
756 self.cpus = [ x for x in range(nr_cpus) ]
757 else:
758 self.cpus = ParseCPUStr(self.cpu_str, nr_cpus)
759 else:
760 self.cpu_str = None
761 self.cpus = [-1]
762 if self.verbosity.debug:
763 print("cpus", self.cpus)
764
765 def IsIntelPT(self):
766 return self.cmd_line.find("intel_pt") >= 0
767
768 def SplitTimeRanges(self):
769 if self.IsIntelPT() and self.interval == 0:
770 self.split_time_ranges_for_each_cpu = \
771 SplitTimeRangesByTraceDataDensity(self.time_ranges, self.cpus, self.orig_nr,
772 self.orig_cmd, self.file_name, self.per_cpu,
773 self.min_size, self.min_interval, self.verbosity)
774 elif self.nr:
775 self.split_time_ranges_for_each_cpu = [ SplitTimeRangesIntoN(self.time_ranges, self.nr, self.min_interval) ]
776 else:
777 self.split_time_ranges_for_each_cpu = [ SplitTimeRangesByInterval(self.time_ranges, self.interval) ]
778
779 def CheckTimeRanges(self):
780 for tr in self.split_time_ranges_for_each_cpu:
781 # Re-combined time ranges should be the same
782 new_tr = RecombineTimeRanges(tr)
783 if new_tr != self.time_ranges:
784 if self.verbosity.debug:
785 print("tr", tr)
786 print("new_tr", new_tr)
787 raise Exception("Self test failed!")
788
789 def OpenTimeRangeEnds(self):
790 for time_ranges in self.split_time_ranges_for_each_cpu:
791 OpenTimeRangeEnds(time_ranges, self.min_time, self.max_time)
792
793 def CreateWorkList(self):
794 self.worklist = CreateWorkList(self.cmd, self.pipe_to, self.output_dir, self.cpus, self.split_time_ranges_for_each_cpu)
795
796 def PerfDataRecordedPerCPU(self):
797 if "--per-thread" in self.cmd_line.split():
798 return False
799 return True
800
801 def DefaultToPerCPU(self):
802 # --no-per-cpu option takes precedence
803 if self.no_per_cpu:
804 return False
805 if not self.PerfDataRecordedPerCPU():
806 return False
807 # Default to per-cpu for Intel PT data that was recorded per-cpu,
808 # because decoding can be done for each CPU separately.
809 if self.IsIntelPT():
810 return True
811 return False
812
813 def Config(self):
814 self.Init()
815 self.ExtractTimeInfo()
816 if not self.per_cpu:
817 self.per_cpu = self.DefaultToPerCPU()
818 if self.verbosity.debug:
819 print("per_cpu", self.per_cpu)
820 self.ExtractCPUInfo()
821 self.SplitTimeRanges()
822 if self.verbosity.self_test:
823 self.CheckTimeRanges()
824 # Prefer open-ended time range to starting / ending with min_time / max_time resp.
825 self.OpenTimeRangeEnds()
826 self.CreateWorkList()
827
828 def Run(self):
829 if self.dry_run:
830 print(len(self.worklist),"jobs:")
831 for w in self.worklist:
832 print(w.Command())
833 return True
834 result = RunWork(self.worklist, self.jobs, verbosity=self.verbosity)
835 if self.verbosity.verbose:
836 print(glb_prog_name, "done")
837 return result
838
839def RunParallelPerf(a):
840 pp = ParallelPerf(a)
841 pp.Config()
842 return pp.Run()
843
844def Main(args):
845 ap = argparse.ArgumentParser(
846 prog=glb_prog_name, formatter_class = argparse.RawDescriptionHelpFormatter,
847 description =
848"""
849Run a perf script command multiple times in parallel, using perf script options
850--cpu and --time so that each job processes a different chunk of the data.
851""",
852 epilog =
853"""
854Follow the options by '--' and then the perf script command e.g.
855
856 $ perf record -a -- sleep 10
857 $ parallel-perf.py --nr=4 -- perf script --ns
858 All jobs finished successfully
859 $ tree parallel-perf-output/
860 parallel-perf-output/
861 ├── time-range-0
862 │ ├── cmd.txt
863 │ └── out.txt
864 ├── time-range-1
865 │ ├── cmd.txt
866 │ └── out.txt
867 ├── time-range-2
868 │ ├── cmd.txt
869 │ └── out.txt
870 └── time-range-3
871 ├── cmd.txt
872 └── out.txt
873 $ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
874 parallel-perf-output/time-range-0/cmd.txt:perf script --time=,9466.504461499 --ns
875 parallel-perf-output/time-range-1/cmd.txt:perf script --time=9466.504461500,9469.005396999 --ns
876 parallel-perf-output/time-range-2/cmd.txt:perf script --time=9469.005397000,9471.506332499 --ns
877 parallel-perf-output/time-range-3/cmd.txt:perf script --time=9471.506332500, --ns
878
879Any perf script command can be used, including the use of perf script options
880--dlfilter and --script, so that the benefit of running parallel jobs
881naturally extends to them also.
882
883If option --pipe-to is used, standard output is first piped through that
884command. Beware, if the command fails (e.g. grep with no matches), it will be
885considered a fatal error.
886
887Final standard output is redirected to files named out.txt in separate
888subdirectories under the output directory. Similarly, standard error is
889written to files named err.txt. In addition, files named cmd.txt contain the
890corresponding perf script command. After processing, err.txt files are removed
891if they are empty.
892
893If any job exits with a non-zero exit code, then all jobs are killed and no
894more are started. A message is printed if any job results in a non-empty
895err.txt file.
896
897There is a separate output subdirectory for each time range. If the --per-cpu
898option is used, these are further grouped under cpu-n subdirectories, e.g.
899
900 $ parallel-perf.py --per-cpu --nr=2 -- perf script --ns --cpu=0,1
901 All jobs finished successfully
902 $ tree parallel-perf-output
903 parallel-perf-output/
904 ├── cpu-0
905 │ ├── time-range-0
906 │ │ ├── cmd.txt
907 │ │ └── out.txt
908 │ └── time-range-1
909 │ ├── cmd.txt
910 │ └── out.txt
911 └── cpu-1
912 ├── time-range-0
913 │ ├── cmd.txt
914 │ └── out.txt
915 └── time-range-1
916 ├── cmd.txt
917 └── out.txt
918 $ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
919 parallel-perf-output/cpu-0/time-range-0/cmd.txt:perf script --cpu=0 --time=,9469.005396999 --ns
920 parallel-perf-output/cpu-0/time-range-1/cmd.txt:perf script --cpu=0 --time=9469.005397000, --ns
921 parallel-perf-output/cpu-1/time-range-0/cmd.txt:perf script --cpu=1 --time=,9469.005396999 --ns
922 parallel-perf-output/cpu-1/time-range-1/cmd.txt:perf script --cpu=1 --time=9469.005397000, --ns
923
924Subdivisions of time range, and cpus if the --per-cpu option is used, are
925expressed by the --time and --cpu perf script options respectively. If the
926supplied perf script command has a --time option, then that time range is
927subdivided, otherwise the time range given by 'time of first sample' to
928'time of last sample' is used (refer perf script --header-only). Similarly, the
929supplied perf script command may provide a --cpu option, and only those CPUs
930will be processed.
931
932To prevent time intervals becoming too small, the --min-interval option can
933be used.
934
935Note there is special handling for processing Intel PT traces. If an interval is
936not specified and the perf record command contained the intel_pt event, then the
937time range will be subdivided in order to produce subdivisions that contain
938approximately the same amount of trace data. That is accomplished by counting
939double-quick (--itrace=qqi) samples, and choosing time ranges that encompass
940approximately the same number of samples. In that case, time ranges may not be
941the same for each CPU processed. For Intel PT, --per-cpu is the default, but
942that can be overridden by --no-per-cpu. Note, for Intel PT, double-quick
943decoding produces 1 sample for each PSB synchronization packet, which in turn
944come after a certain number of bytes output, determined by psb_period (refer
945perf Intel PT documentation). The minimum number of double-quick samples that
946will define a time range can be set by the --min_size option, which defaults to
94764.
948""")
949 ap.add_argument("-o", "--output-dir", default="parallel-perf-output", help="output directory (default 'parallel-perf-output')")
950 ap.add_argument("-j", "--jobs", type=int, default=0, help="maximum number of jobs to run in parallel at one time (default is the number of CPUs)")
951 ap.add_argument("-n", "--nr", type=int, default=0, help="number of time subdivisions (default is the number of jobs)")
952 ap.add_argument("-i", "--interval", type=float, default=0, help="subdivide the time range using this time interval (in seconds e.g. 0.1 for a tenth of a second)")
953 ap.add_argument("-c", "--per-cpu", action="store_true", help="process data for each CPU in parallel")
954 ap.add_argument("-m", "--min-interval", type=float, default=glb_min_interval, help=f"minimum interval (default {glb_min_interval} seconds)")
955 ap.add_argument("-p", "--pipe-to", help="command to pipe output to (optional)")
956 ap.add_argument("-N", "--no-per-cpu", action="store_true", help="do not process data for each CPU in parallel")
957 ap.add_argument("-b", "--min_size", type=int, default=glb_min_samples, help="minimum data size (for Intel PT in PSBs)")
958 ap.add_argument("-D", "--dry-run", action="store_true", help="do not run any jobs, just show the perf script commands")
959 ap.add_argument("-q", "--quiet", action="store_true", help="do not print any messages except errors")
960 ap.add_argument("-v", "--verbose", action="store_true", help="print more messages")
961 ap.add_argument("-d", "--debug", action="store_true", help="print debugging messages")
962 cmd_line = list(args)
963 try:
964 split_pos = cmd_line.index("--")
965 cmd = cmd_line[split_pos + 1:]
966 args = cmd_line[:split_pos]
967 except:
968 cmd = None
969 args = cmd_line
970 a = ap.parse_args(args=args[1:])
971 a.cmd = cmd
972 a.verbosity = Verbosity(a.quiet, a.verbose, a.debug)
973 try:
974 if a.cmd == None:
975 if len(args) <= 1:
976 ap.print_help()
977 return True
978 raise Exception("Command line must contain '--' before perf command")
979 return RunParallelPerf(a)
980 except Exception as e:
981 print("Fatal error: ", str(e))
982 if a.debug:
983 raise
984 return False
985
986if __name__ == "__main__":
987 if not Main(sys.argv):
988 sys.exit(1)