Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13import re
14import sys
15
16import datetime
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20class Test:
21 """
22 A class to represent a test parsed from KTAP results. All KTAP
23 results within a test log are stored in a main Test object as
24 subtests.
25
26 Attributes:
27 status : TestStatus - status of the test
28 name : str - name of the test
29 expected_count : int - expected number of subtests (0 if single
30 test case and None if unknown expected number of subtests)
31 subtests : List[Test] - list of subtests
32 log : List[str] - log of KTAP lines that correspond to the test
33 counts : TestCounts - counts of the test statuses and errors of
34 subtests or of the test itself if the test is a single
35 test case.
36 """
37 def __init__(self) -> None:
38 """Creates Test object with default attributes."""
39 self.status = TestStatus.TEST_CRASHED
40 self.name = ''
41 self.expected_count = 0 # type: Optional[int]
42 self.subtests = [] # type: List[Test]
43 self.log = [] # type: List[str]
44 self.counts = TestCounts()
45
46 def __str__(self) -> str:
47 """Returns string representation of a Test class object."""
48 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
49 f'{self.subtests}, {self.log}, {self.counts})')
50
51 def __repr__(self) -> str:
52 """Returns string representation of a Test class object."""
53 return str(self)
54
55 def add_error(self, error_message: str) -> None:
56 """Records an error that occurred while parsing this test."""
57 self.counts.errors += 1
58 print_with_timestamp(red('[ERROR]') + f' Test: {self.name}: {error_message}')
59
60class TestStatus(Enum):
61 """An enumeration class to represent the status of a test."""
62 SUCCESS = auto()
63 FAILURE = auto()
64 SKIPPED = auto()
65 TEST_CRASHED = auto()
66 NO_TESTS = auto()
67 FAILURE_TO_PARSE_TESTS = auto()
68
69class TestCounts:
70 """
71 Tracks the counts of statuses of all test cases and any errors within
72 a Test.
73
74 Attributes:
75 passed : int - the number of tests that have passed
76 failed : int - the number of tests that have failed
77 crashed : int - the number of tests that have crashed
78 skipped : int - the number of tests that have skipped
79 errors : int - the number of errors in the test and subtests
80 """
81 def __init__(self):
82 """Creates TestCounts object with counts of all test
83 statuses and test errors set to 0.
84 """
85 self.passed = 0
86 self.failed = 0
87 self.crashed = 0
88 self.skipped = 0
89 self.errors = 0
90
91 def __str__(self) -> str:
92 """Returns the string representation of a TestCounts object."""
93 statuses = [('passed', self.passed), ('failed', self.failed),
94 ('crashed', self.crashed), ('skipped', self.skipped),
95 ('errors', self.errors)]
96 return f'Ran {self.total()} tests: ' + \
97 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
98
99 def total(self) -> int:
100 """Returns the total number of test cases within a test
101 object, where a test case is a test with no subtests.
102 """
103 return (self.passed + self.failed + self.crashed +
104 self.skipped)
105
106 def add_subtest_counts(self, counts: TestCounts) -> None:
107 """
108 Adds the counts of another TestCounts object to the current
109 TestCounts object. Used to add the counts of a subtest to the
110 parent test.
111
112 Parameters:
113 counts - a different TestCounts object whose counts
114 will be added to the counts of the TestCounts object
115 """
116 self.passed += counts.passed
117 self.failed += counts.failed
118 self.crashed += counts.crashed
119 self.skipped += counts.skipped
120 self.errors += counts.errors
121
122 def get_status(self) -> TestStatus:
123 """Returns the aggregated status of a Test using test
124 counts.
125 """
126 if self.total() == 0:
127 return TestStatus.NO_TESTS
128 if self.crashed:
129 # Crashes should take priority.
130 return TestStatus.TEST_CRASHED
131 if self.failed:
132 return TestStatus.FAILURE
133 if self.passed:
134 # No failures or crashes, looks good!
135 return TestStatus.SUCCESS
136 # We have only skipped tests.
137 return TestStatus.SKIPPED
138
139 def add_status(self, status: TestStatus) -> None:
140 """Increments the count for `status`."""
141 if status == TestStatus.SUCCESS:
142 self.passed += 1
143 elif status == TestStatus.FAILURE:
144 self.failed += 1
145 elif status == TestStatus.SKIPPED:
146 self.skipped += 1
147 elif status != TestStatus.NO_TESTS:
148 self.crashed += 1
149
150class LineStream:
151 """
152 A class to represent the lines of kernel output.
153 Provides a lazy peek()/pop() interface over an iterator of
154 (line#, text).
155 """
156 _lines: Iterator[Tuple[int, str]]
157 _next: Tuple[int, str]
158 _need_next: bool
159 _done: bool
160
161 def __init__(self, lines: Iterator[Tuple[int, str]]):
162 """Creates a new LineStream that wraps the given iterator."""
163 self._lines = lines
164 self._done = False
165 self._need_next = True
166 self._next = (0, '')
167
168 def _get_next(self) -> None:
169 """Advances the LineSteam to the next line, if necessary."""
170 if not self._need_next:
171 return
172 try:
173 self._next = next(self._lines)
174 except StopIteration:
175 self._done = True
176 finally:
177 self._need_next = False
178
179 def peek(self) -> str:
180 """Returns the current line, without advancing the LineStream.
181 """
182 self._get_next()
183 return self._next[1]
184
185 def pop(self) -> str:
186 """Returns the current line and advances the LineStream to
187 the next line.
188 """
189 s = self.peek()
190 if self._done:
191 raise ValueError(f'LineStream: going past EOF, last line was {s}')
192 self._need_next = True
193 return s
194
195 def __bool__(self) -> bool:
196 """Returns True if stream has more lines."""
197 self._get_next()
198 return not self._done
199
200 # Only used by kunit_tool_test.py.
201 def __iter__(self) -> Iterator[str]:
202 """Empties all lines stored in LineStream object into
203 Iterator object and returns the Iterator object.
204 """
205 while bool(self):
206 yield self.pop()
207
208 def line_number(self) -> int:
209 """Returns the line number of the current line."""
210 self._get_next()
211 return self._next[0]
212
213# Parsing helper methods:
214
215KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
216TAP_START = re.compile(r'TAP version ([0-9]+)$')
217KTAP_END = re.compile('(List of all partitions:|'
218 'Kernel panic - not syncing: VFS:|reboot: System halted)')
219
220def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
221 """Extracts KTAP lines from the kernel output."""
222 def isolate_ktap_output(kernel_output: Iterable[str]) \
223 -> Iterator[Tuple[int, str]]:
224 line_num = 0
225 started = False
226 for line in kernel_output:
227 line_num += 1
228 line = line.rstrip() # remove trailing \n
229 if not started and KTAP_START.search(line):
230 # start extracting KTAP lines and set prefix
231 # to number of characters before version line
232 prefix_len = len(
233 line.split('KTAP version')[0])
234 started = True
235 yield line_num, line[prefix_len:]
236 elif not started and TAP_START.search(line):
237 # start extracting KTAP lines and set prefix
238 # to number of characters before version line
239 prefix_len = len(line.split('TAP version')[0])
240 started = True
241 yield line_num, line[prefix_len:]
242 elif started and KTAP_END.search(line):
243 # stop extracting KTAP lines
244 break
245 elif started:
246 # remove prefix and any indention and yield
247 # line with line number
248 line = line[prefix_len:].lstrip()
249 yield line_num, line
250 return LineStream(lines=isolate_ktap_output(kernel_output))
251
252KTAP_VERSIONS = [1]
253TAP_VERSIONS = [13, 14]
254
255def check_version(version_num: int, accepted_versions: List[int],
256 version_type: str, test: Test) -> None:
257 """
258 Adds error to test object if version number is too high or too
259 low.
260
261 Parameters:
262 version_num - The inputted version number from the parsed KTAP or TAP
263 header line
264 accepted_version - List of accepted KTAP or TAP versions
265 version_type - 'KTAP' or 'TAP' depending on the type of
266 version line.
267 test - Test object for current test being parsed
268 """
269 if version_num < min(accepted_versions):
270 test.add_error(f'{version_type} version lower than expected!')
271 elif version_num > max(accepted_versions):
272 test.add_error(f'{version_type} version higer than expected!')
273
274def parse_ktap_header(lines: LineStream, test: Test) -> bool:
275 """
276 Parses KTAP/TAP header line and checks version number.
277 Returns False if fails to parse KTAP/TAP header line.
278
279 Accepted formats:
280 - 'KTAP version [version number]'
281 - 'TAP version [version number]'
282
283 Parameters:
284 lines - LineStream of KTAP output to parse
285 test - Test object for current test being parsed
286
287 Return:
288 True if successfully parsed KTAP/TAP header line
289 """
290 ktap_match = KTAP_START.match(lines.peek())
291 tap_match = TAP_START.match(lines.peek())
292 if ktap_match:
293 version_num = int(ktap_match.group(1))
294 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
295 elif tap_match:
296 version_num = int(tap_match.group(1))
297 check_version(version_num, TAP_VERSIONS, 'TAP', test)
298 else:
299 return False
300 test.log.append(lines.pop())
301 return True
302
303TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306 """
307 Parses test header and stores test name in test object.
308 Returns False if fails to parse test header line.
309
310 Accepted format:
311 - '# Subtest: [test name]'
312
313 Parameters:
314 lines - LineStream of KTAP output to parse
315 test - Test object for current test being parsed
316
317 Return:
318 True if successfully parsed test header line
319 """
320 match = TEST_HEADER.match(lines.peek())
321 if not match:
322 return False
323 test.log.append(lines.pop())
324 test.name = match.group(1)
325 return True
326
327TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330 """
331 Parses test plan line and stores the expected number of subtests in
332 test object. Reports an error if expected count is 0.
333 Returns False and sets expected_count to None if there is no valid test
334 plan.
335
336 Accepted format:
337 - '1..[number of subtests]'
338
339 Parameters:
340 lines - LineStream of KTAP output to parse
341 test - Test object for current test being parsed
342
343 Return:
344 True if successfully parsed test plan line
345 """
346 match = TEST_PLAN.match(lines.peek())
347 if not match:
348 test.expected_count = None
349 return False
350 test.log.append(lines.pop())
351 expected_count = int(match.group(1))
352 test.expected_count = expected_count
353 return True
354
355TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360 """
361 Matches current line with the format of a test result line and checks
362 if the name matches the name of the current test.
363 Returns False if fails to match format or name.
364
365 Accepted format:
366 - '[ok|not ok] [test number] [-] [test name] [optional skip
367 directive]'
368
369 Parameters:
370 lines - LineStream of KTAP output to parse
371 test - Test object for current test being parsed
372
373 Return:
374 True if matched a test result line and the name matching the
375 expected test name
376 """
377 line = lines.peek()
378 match = TEST_RESULT.match(line)
379 if not match:
380 return False
381 name = match.group(4)
382 return name == test.name
383
384def parse_test_result(lines: LineStream, test: Test,
385 expected_num: int) -> bool:
386 """
387 Parses test result line and stores the status and name in the test
388 object. Reports an error if the test number does not match expected
389 test number.
390 Returns False if fails to parse test result line.
391
392 Note that the SKIP directive is the only direction that causes a
393 change in status.
394
395 Accepted format:
396 - '[ok|not ok] [test number] [-] [test name] [optional skip
397 directive]'
398
399 Parameters:
400 lines - LineStream of KTAP output to parse
401 test - Test object for current test being parsed
402 expected_num - expected test number for current test
403
404 Return:
405 True if successfully parsed a test result line.
406 """
407 line = lines.peek()
408 match = TEST_RESULT.match(line)
409 skip_match = TEST_RESULT_SKIP.match(line)
410
411 # Check if line matches test result line format
412 if not match:
413 return False
414 test.log.append(lines.pop())
415
416 # Set name of test object
417 if skip_match:
418 test.name = skip_match.group(4)
419 else:
420 test.name = match.group(4)
421
422 # Check test num
423 num = int(match.group(2))
424 if num != expected_num:
425 test.add_error(f'Expected test number {expected_num} but found {num}')
426
427 # Set status of test object
428 status = match.group(1)
429 if skip_match:
430 test.status = TestStatus.SKIPPED
431 elif status == 'ok':
432 test.status = TestStatus.SUCCESS
433 else:
434 test.status = TestStatus.FAILURE
435 return True
436
437def parse_diagnostic(lines: LineStream) -> List[str]:
438 """
439 Parse lines that do not match the format of a test result line or
440 test header line and returns them in list.
441
442 Line formats that are not parsed:
443 - '# Subtest: [test name]'
444 - '[ok|not ok] [test number] [-] [test name] [optional skip
445 directive]'
446
447 Parameters:
448 lines - LineStream of KTAP output to parse
449
450 Return:
451 Log of diagnostic lines
452 """
453 log = [] # type: List[str]
454 while lines and not TEST_RESULT.match(lines.peek()) and not \
455 TEST_HEADER.match(lines.peek()):
456 log.append(lines.pop())
457 return log
458
459
460# Printing helper methods:
461
462DIVIDER = '=' * 60
463
464RESET = '\033[0;0m'
465
466def red(text: str) -> str:
467 """Returns inputted string with red color code."""
468 if not sys.stdout.isatty():
469 return text
470 return '\033[1;31m' + text + RESET
471
472def yellow(text: str) -> str:
473 """Returns inputted string with yellow color code."""
474 if not sys.stdout.isatty():
475 return text
476 return '\033[1;33m' + text + RESET
477
478def green(text: str) -> str:
479 """Returns inputted string with green color code."""
480 if not sys.stdout.isatty():
481 return text
482 return '\033[1;32m' + text + RESET
483
484ANSI_LEN = len(red(''))
485
486def print_with_timestamp(message: str) -> None:
487 """Prints message with timestamp at beginning."""
488 print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message))
489
490def format_test_divider(message: str, len_message: int) -> str:
491 """
492 Returns string with message centered in fixed width divider.
493
494 Example:
495 '===================== message example ====================='
496
497 Parameters:
498 message - message to be centered in divider line
499 len_message - length of the message to be printed such that
500 any characters of the color codes are not counted
501
502 Return:
503 String containing message centered in fixed width divider
504 """
505 default_count = 3 # default number of dashes
506 len_1 = default_count
507 len_2 = default_count
508 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
509 if difference > 0:
510 # calculate number of dashes for each side of the divider
511 len_1 = int(difference / 2)
512 len_2 = difference - len_1
513 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
514
515def print_test_header(test: Test) -> None:
516 """
517 Prints test header with test name and optionally the expected number
518 of subtests.
519
520 Example:
521 '=================== example (2 subtests) ==================='
522
523 Parameters:
524 test - Test object representing current test being printed
525 """
526 message = test.name
527 if test.expected_count:
528 if test.expected_count == 1:
529 message += ' (1 subtest)'
530 else:
531 message += f' ({test.expected_count} subtests)'
532 print_with_timestamp(format_test_divider(message, len(message)))
533
534def print_log(log: Iterable[str]) -> None:
535 """Prints all strings in saved log for test in yellow."""
536 for m in log:
537 print_with_timestamp(yellow(m))
538
539def format_test_result(test: Test) -> str:
540 """
541 Returns string with formatted test result with colored status and test
542 name.
543
544 Example:
545 '[PASSED] example'
546
547 Parameters:
548 test - Test object representing current test being printed
549
550 Return:
551 String containing formatted test result
552 """
553 if test.status == TestStatus.SUCCESS:
554 return green('[PASSED] ') + test.name
555 if test.status == TestStatus.SKIPPED:
556 return yellow('[SKIPPED] ') + test.name
557 if test.status == TestStatus.NO_TESTS:
558 return yellow('[NO TESTS RUN] ') + test.name
559 if test.status == TestStatus.TEST_CRASHED:
560 print_log(test.log)
561 return red('[CRASHED] ') + test.name
562 print_log(test.log)
563 return red('[FAILED] ') + test.name
564
565def print_test_result(test: Test) -> None:
566 """
567 Prints result line with status of test.
568
569 Example:
570 '[PASSED] example'
571
572 Parameters:
573 test - Test object representing current test being printed
574 """
575 print_with_timestamp(format_test_result(test))
576
577def print_test_footer(test: Test) -> None:
578 """
579 Prints test footer with status of test.
580
581 Example:
582 '===================== [PASSED] example ====================='
583
584 Parameters:
585 test - Test object representing current test being printed
586 """
587 message = format_test_result(test)
588 print_with_timestamp(format_test_divider(message,
589 len(message) - ANSI_LEN))
590
591def print_summary_line(test: Test) -> None:
592 """
593 Prints summary line of test object. Color of line is dependent on
594 status of test. Color is green if test passes, yellow if test is
595 skipped, and red if the test fails or crashes. Summary line contains
596 counts of the statuses of the tests subtests or the test itself if it
597 has no subtests.
598
599 Example:
600 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
601 Errors: 0"
602
603 test - Test object representing current test being printed
604 """
605 if test.status == TestStatus.SUCCESS:
606 color = green
607 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
608 color = yellow
609 else:
610 color = red
611 print_with_timestamp(color(f'Testing complete. {test.counts}'))
612
613# Other methods:
614
615def bubble_up_test_results(test: Test) -> None:
616 """
617 If the test has subtests, add the test counts of the subtests to the
618 test and check if any of the tests crashed and if so set the test
619 status to crashed. Otherwise if the test has no subtests add the
620 status of the test to the test counts.
621
622 Parameters:
623 test - Test object for current test being parsed
624 """
625 subtests = test.subtests
626 counts = test.counts
627 status = test.status
628 for t in subtests:
629 counts.add_subtest_counts(t.counts)
630 if counts.total() == 0:
631 counts.add_status(status)
632 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
633 test.status = TestStatus.TEST_CRASHED
634
635def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
636 """
637 Finds next test to parse in LineStream, creates new Test object,
638 parses any subtests of the test, populates Test object with all
639 information (status, name) about the test and the Test objects for
640 any subtests, and then returns the Test object. The method accepts
641 three formats of tests:
642
643 Accepted test formats:
644
645 - Main KTAP/TAP header
646
647 Example:
648
649 KTAP version 1
650 1..4
651 [subtests]
652
653 - Subtest header line
654
655 Example:
656
657 # Subtest: name
658 1..3
659 [subtests]
660 ok 1 name
661
662 - Test result line
663
664 Example:
665
666 ok 1 - test
667
668 Parameters:
669 lines - LineStream of KTAP output to parse
670 expected_num - expected test number for test to be parsed
671 log - list of strings containing any preceding diagnostic lines
672 corresponding to the current test
673
674 Return:
675 Test object populated with characteristics and any subtests
676 """
677 test = Test()
678 test.log.extend(log)
679 parent_test = False
680 main = parse_ktap_header(lines, test)
681 if main:
682 # If KTAP/TAP header is found, attempt to parse
683 # test plan
684 test.name = "main"
685 parse_test_plan(lines, test)
686 parent_test = True
687 else:
688 # If KTAP/TAP header is not found, test must be subtest
689 # header or test result line so parse attempt to parser
690 # subtest header
691 parent_test = parse_test_header(lines, test)
692 if parent_test:
693 # If subtest header is found, attempt to parse
694 # test plan and print header
695 parse_test_plan(lines, test)
696 print_test_header(test)
697 expected_count = test.expected_count
698 subtests = []
699 test_num = 1
700 while parent_test and (expected_count is None or test_num <= expected_count):
701 # Loop to parse any subtests.
702 # Break after parsing expected number of tests or
703 # if expected number of tests is unknown break when test
704 # result line with matching name to subtest header is found
705 # or no more lines in stream.
706 sub_log = parse_diagnostic(lines)
707 sub_test = Test()
708 if not lines or (peek_test_name_match(lines, test) and
709 not main):
710 if expected_count and test_num <= expected_count:
711 # If parser reaches end of test before
712 # parsing expected number of subtests, print
713 # crashed subtest and record error
714 test.add_error('missing expected subtest!')
715 sub_test.log.extend(sub_log)
716 test.counts.add_status(
717 TestStatus.TEST_CRASHED)
718 print_test_result(sub_test)
719 else:
720 test.log.extend(sub_log)
721 break
722 else:
723 sub_test = parse_test(lines, test_num, sub_log)
724 subtests.append(sub_test)
725 test_num += 1
726 test.subtests = subtests
727 if not main:
728 # If not main test, look for test result line
729 test.log.extend(parse_diagnostic(lines))
730 if (parent_test and peek_test_name_match(lines, test)) or \
731 not parent_test:
732 parse_test_result(lines, test, expected_num)
733 else:
734 test.add_error('missing subtest result line!')
735
736 # Check for there being no tests
737 if parent_test and len(subtests) == 0:
738 # Don't override a bad status if this test had one reported.
739 # Assumption: no subtests means CRASHED is from Test.__init__()
740 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
741 test.status = TestStatus.NO_TESTS
742 test.add_error('0 tests run!')
743
744 # Add statuses to TestCounts attribute in Test object
745 bubble_up_test_results(test)
746 if parent_test and not main:
747 # If test has subtests and is not the main test object, print
748 # footer.
749 print_test_footer(test)
750 elif not main:
751 print_test_result(test)
752 return test
753
754def parse_run_tests(kernel_output: Iterable[str]) -> Test:
755 """
756 Using kernel output, extract KTAP lines, parse the lines for test
757 results and print condensed test results and summary line.
758
759 Parameters:
760 kernel_output - Iterable object contains lines of kernel output
761
762 Return:
763 Test - the main test object with all subtests.
764 """
765 print_with_timestamp(DIVIDER)
766 lines = extract_tap_lines(kernel_output)
767 test = Test()
768 if not lines:
769 test.name = '<missing>'
770 test.add_error('could not find any KTAP output!')
771 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
772 else:
773 test = parse_test(lines, 0, [])
774 if test.status != TestStatus.NO_TESTS:
775 test.status = test.counts.get_status()
776 print_with_timestamp(DIVIDER)
777 print_summary_line(test)
778 return test