Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13import re
14import sys
15
16from enum import Enum, auto
17from typing import Iterable, Iterator, List, Optional, Tuple
18
19from kunit_printer import stdout
20
21class Test:
22 """
23 A class to represent a test parsed from KTAP results. All KTAP
24 results within a test log are stored in a main Test object as
25 subtests.
26
27 Attributes:
28 status : TestStatus - status of the test
29 name : str - name of the test
30 expected_count : int - expected number of subtests (0 if single
31 test case and None if unknown expected number of subtests)
32 subtests : List[Test] - list of subtests
33 log : List[str] - log of KTAP lines that correspond to the test
34 counts : TestCounts - counts of the test statuses and errors of
35 subtests or of the test itself if the test is a single
36 test case.
37 """
38 def __init__(self) -> None:
39 """Creates Test object with default attributes."""
40 self.status = TestStatus.TEST_CRASHED
41 self.name = ''
42 self.expected_count = 0 # type: Optional[int]
43 self.subtests = [] # type: List[Test]
44 self.log = [] # type: List[str]
45 self.counts = TestCounts()
46
47 def __str__(self) -> str:
48 """Returns string representation of a Test class object."""
49 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
50 f'{self.subtests}, {self.log}, {self.counts})')
51
52 def __repr__(self) -> str:
53 """Returns string representation of a Test class object."""
54 return str(self)
55
56 def add_error(self, error_message: str) -> None:
57 """Records an error that occurred while parsing this test."""
58 self.counts.errors += 1
59 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
60
61class TestStatus(Enum):
62 """An enumeration class to represent the status of a test."""
63 SUCCESS = auto()
64 FAILURE = auto()
65 SKIPPED = auto()
66 TEST_CRASHED = auto()
67 NO_TESTS = auto()
68 FAILURE_TO_PARSE_TESTS = auto()
69
70class TestCounts:
71 """
72 Tracks the counts of statuses of all test cases and any errors within
73 a Test.
74
75 Attributes:
76 passed : int - the number of tests that have passed
77 failed : int - the number of tests that have failed
78 crashed : int - the number of tests that have crashed
79 skipped : int - the number of tests that have skipped
80 errors : int - the number of errors in the test and subtests
81 """
82 def __init__(self):
83 """Creates TestCounts object with counts of all test
84 statuses and test errors set to 0.
85 """
86 self.passed = 0
87 self.failed = 0
88 self.crashed = 0
89 self.skipped = 0
90 self.errors = 0
91
92 def __str__(self) -> str:
93 """Returns the string representation of a TestCounts object."""
94 statuses = [('passed', self.passed), ('failed', self.failed),
95 ('crashed', self.crashed), ('skipped', self.skipped),
96 ('errors', self.errors)]
97 return f'Ran {self.total()} tests: ' + \
98 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
99
100 def total(self) -> int:
101 """Returns the total number of test cases within a test
102 object, where a test case is a test with no subtests.
103 """
104 return (self.passed + self.failed + self.crashed +
105 self.skipped)
106
107 def add_subtest_counts(self, counts: TestCounts) -> None:
108 """
109 Adds the counts of another TestCounts object to the current
110 TestCounts object. Used to add the counts of a subtest to the
111 parent test.
112
113 Parameters:
114 counts - a different TestCounts object whose counts
115 will be added to the counts of the TestCounts object
116 """
117 self.passed += counts.passed
118 self.failed += counts.failed
119 self.crashed += counts.crashed
120 self.skipped += counts.skipped
121 self.errors += counts.errors
122
123 def get_status(self) -> TestStatus:
124 """Returns the aggregated status of a Test using test
125 counts.
126 """
127 if self.total() == 0:
128 return TestStatus.NO_TESTS
129 if self.crashed:
130 # Crashes should take priority.
131 return TestStatus.TEST_CRASHED
132 if self.failed:
133 return TestStatus.FAILURE
134 if self.passed:
135 # No failures or crashes, looks good!
136 return TestStatus.SUCCESS
137 # We have only skipped tests.
138 return TestStatus.SKIPPED
139
140 def add_status(self, status: TestStatus) -> None:
141 """Increments the count for `status`."""
142 if status == TestStatus.SUCCESS:
143 self.passed += 1
144 elif status == TestStatus.FAILURE:
145 self.failed += 1
146 elif status == TestStatus.SKIPPED:
147 self.skipped += 1
148 elif status != TestStatus.NO_TESTS:
149 self.crashed += 1
150
151class LineStream:
152 """
153 A class to represent the lines of kernel output.
154 Provides a lazy peek()/pop() interface over an iterator of
155 (line#, text).
156 """
157 _lines: Iterator[Tuple[int, str]]
158 _next: Tuple[int, str]
159 _need_next: bool
160 _done: bool
161
162 def __init__(self, lines: Iterator[Tuple[int, str]]):
163 """Creates a new LineStream that wraps the given iterator."""
164 self._lines = lines
165 self._done = False
166 self._need_next = True
167 self._next = (0, '')
168
169 def _get_next(self) -> None:
170 """Advances the LineSteam to the next line, if necessary."""
171 if not self._need_next:
172 return
173 try:
174 self._next = next(self._lines)
175 except StopIteration:
176 self._done = True
177 finally:
178 self._need_next = False
179
180 def peek(self) -> str:
181 """Returns the current line, without advancing the LineStream.
182 """
183 self._get_next()
184 return self._next[1]
185
186 def pop(self) -> str:
187 """Returns the current line and advances the LineStream to
188 the next line.
189 """
190 s = self.peek()
191 if self._done:
192 raise ValueError(f'LineStream: going past EOF, last line was {s}')
193 self._need_next = True
194 return s
195
196 def __bool__(self) -> bool:
197 """Returns True if stream has more lines."""
198 self._get_next()
199 return not self._done
200
201 # Only used by kunit_tool_test.py.
202 def __iter__(self) -> Iterator[str]:
203 """Empties all lines stored in LineStream object into
204 Iterator object and returns the Iterator object.
205 """
206 while bool(self):
207 yield self.pop()
208
209 def line_number(self) -> int:
210 """Returns the line number of the current line."""
211 self._get_next()
212 return self._next[0]
213
214# Parsing helper methods:
215
216KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
217TAP_START = re.compile(r'TAP version ([0-9]+)$')
218KTAP_END = re.compile('(List of all partitions:|'
219 'Kernel panic - not syncing: VFS:|reboot: System halted)')
220
221def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
222 """Extracts KTAP lines from the kernel output."""
223 def isolate_ktap_output(kernel_output: Iterable[str]) \
224 -> Iterator[Tuple[int, str]]:
225 line_num = 0
226 started = False
227 for line in kernel_output:
228 line_num += 1
229 line = line.rstrip() # remove trailing \n
230 if not started and KTAP_START.search(line):
231 # start extracting KTAP lines and set prefix
232 # to number of characters before version line
233 prefix_len = len(
234 line.split('KTAP version')[0])
235 started = True
236 yield line_num, line[prefix_len:]
237 elif not started and TAP_START.search(line):
238 # start extracting KTAP lines and set prefix
239 # to number of characters before version line
240 prefix_len = len(line.split('TAP version')[0])
241 started = True
242 yield line_num, line[prefix_len:]
243 elif started and KTAP_END.search(line):
244 # stop extracting KTAP lines
245 break
246 elif started:
247 # remove prefix and any indention and yield
248 # line with line number
249 line = line[prefix_len:].lstrip()
250 yield line_num, line
251 return LineStream(lines=isolate_ktap_output(kernel_output))
252
253KTAP_VERSIONS = [1]
254TAP_VERSIONS = [13, 14]
255
256def check_version(version_num: int, accepted_versions: List[int],
257 version_type: str, test: Test) -> None:
258 """
259 Adds error to test object if version number is too high or too
260 low.
261
262 Parameters:
263 version_num - The inputted version number from the parsed KTAP or TAP
264 header line
265 accepted_version - List of accepted KTAP or TAP versions
266 version_type - 'KTAP' or 'TAP' depending on the type of
267 version line.
268 test - Test object for current test being parsed
269 """
270 if version_num < min(accepted_versions):
271 test.add_error(f'{version_type} version lower than expected!')
272 elif version_num > max(accepted_versions):
273 test.add_error(f'{version_type} version higer than expected!')
274
275def parse_ktap_header(lines: LineStream, test: Test) -> bool:
276 """
277 Parses KTAP/TAP header line and checks version number.
278 Returns False if fails to parse KTAP/TAP header line.
279
280 Accepted formats:
281 - 'KTAP version [version number]'
282 - 'TAP version [version number]'
283
284 Parameters:
285 lines - LineStream of KTAP output to parse
286 test - Test object for current test being parsed
287
288 Return:
289 True if successfully parsed KTAP/TAP header line
290 """
291 ktap_match = KTAP_START.match(lines.peek())
292 tap_match = TAP_START.match(lines.peek())
293 if ktap_match:
294 version_num = int(ktap_match.group(1))
295 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
296 elif tap_match:
297 version_num = int(tap_match.group(1))
298 check_version(version_num, TAP_VERSIONS, 'TAP', test)
299 else:
300 return False
301 test.log.append(lines.pop())
302 return True
303
304TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
305
306def parse_test_header(lines: LineStream, test: Test) -> bool:
307 """
308 Parses test header and stores test name in test object.
309 Returns False if fails to parse test header line.
310
311 Accepted format:
312 - '# Subtest: [test name]'
313
314 Parameters:
315 lines - LineStream of KTAP output to parse
316 test - Test object for current test being parsed
317
318 Return:
319 True if successfully parsed test header line
320 """
321 match = TEST_HEADER.match(lines.peek())
322 if not match:
323 return False
324 test.log.append(lines.pop())
325 test.name = match.group(1)
326 return True
327
328TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
329
330def parse_test_plan(lines: LineStream, test: Test) -> bool:
331 """
332 Parses test plan line and stores the expected number of subtests in
333 test object. Reports an error if expected count is 0.
334 Returns False and sets expected_count to None if there is no valid test
335 plan.
336
337 Accepted format:
338 - '1..[number of subtests]'
339
340 Parameters:
341 lines - LineStream of KTAP output to parse
342 test - Test object for current test being parsed
343
344 Return:
345 True if successfully parsed test plan line
346 """
347 match = TEST_PLAN.match(lines.peek())
348 if not match:
349 test.expected_count = None
350 return False
351 test.log.append(lines.pop())
352 expected_count = int(match.group(1))
353 test.expected_count = expected_count
354 return True
355
356TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
357
358TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
359
360def peek_test_name_match(lines: LineStream, test: Test) -> bool:
361 """
362 Matches current line with the format of a test result line and checks
363 if the name matches the name of the current test.
364 Returns False if fails to match format or name.
365
366 Accepted format:
367 - '[ok|not ok] [test number] [-] [test name] [optional skip
368 directive]'
369
370 Parameters:
371 lines - LineStream of KTAP output to parse
372 test - Test object for current test being parsed
373
374 Return:
375 True if matched a test result line and the name matching the
376 expected test name
377 """
378 line = lines.peek()
379 match = TEST_RESULT.match(line)
380 if not match:
381 return False
382 name = match.group(4)
383 return name == test.name
384
385def parse_test_result(lines: LineStream, test: Test,
386 expected_num: int) -> bool:
387 """
388 Parses test result line and stores the status and name in the test
389 object. Reports an error if the test number does not match expected
390 test number.
391 Returns False if fails to parse test result line.
392
393 Note that the SKIP directive is the only direction that causes a
394 change in status.
395
396 Accepted format:
397 - '[ok|not ok] [test number] [-] [test name] [optional skip
398 directive]'
399
400 Parameters:
401 lines - LineStream of KTAP output to parse
402 test - Test object for current test being parsed
403 expected_num - expected test number for current test
404
405 Return:
406 True if successfully parsed a test result line.
407 """
408 line = lines.peek()
409 match = TEST_RESULT.match(line)
410 skip_match = TEST_RESULT_SKIP.match(line)
411
412 # Check if line matches test result line format
413 if not match:
414 return False
415 test.log.append(lines.pop())
416
417 # Set name of test object
418 if skip_match:
419 test.name = skip_match.group(4)
420 else:
421 test.name = match.group(4)
422
423 # Check test num
424 num = int(match.group(2))
425 if num != expected_num:
426 test.add_error(f'Expected test number {expected_num} but found {num}')
427
428 # Set status of test object
429 status = match.group(1)
430 if skip_match:
431 test.status = TestStatus.SKIPPED
432 elif status == 'ok':
433 test.status = TestStatus.SUCCESS
434 else:
435 test.status = TestStatus.FAILURE
436 return True
437
438def parse_diagnostic(lines: LineStream) -> List[str]:
439 """
440 Parse lines that do not match the format of a test result line or
441 test header line and returns them in list.
442
443 Line formats that are not parsed:
444 - '# Subtest: [test name]'
445 - '[ok|not ok] [test number] [-] [test name] [optional skip
446 directive]'
447
448 Parameters:
449 lines - LineStream of KTAP output to parse
450
451 Return:
452 Log of diagnostic lines
453 """
454 log = [] # type: List[str]
455 while lines and not TEST_RESULT.match(lines.peek()) and not \
456 TEST_HEADER.match(lines.peek()):
457 log.append(lines.pop())
458 return log
459
460
461# Printing helper methods:
462
463DIVIDER = '=' * 60
464
465def format_test_divider(message: str, len_message: int) -> str:
466 """
467 Returns string with message centered in fixed width divider.
468
469 Example:
470 '===================== message example ====================='
471
472 Parameters:
473 message - message to be centered in divider line
474 len_message - length of the message to be printed such that
475 any characters of the color codes are not counted
476
477 Return:
478 String containing message centered in fixed width divider
479 """
480 default_count = 3 # default number of dashes
481 len_1 = default_count
482 len_2 = default_count
483 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
484 if difference > 0:
485 # calculate number of dashes for each side of the divider
486 len_1 = int(difference / 2)
487 len_2 = difference - len_1
488 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
489
490def print_test_header(test: Test) -> None:
491 """
492 Prints test header with test name and optionally the expected number
493 of subtests.
494
495 Example:
496 '=================== example (2 subtests) ==================='
497
498 Parameters:
499 test - Test object representing current test being printed
500 """
501 message = test.name
502 if test.expected_count:
503 if test.expected_count == 1:
504 message += ' (1 subtest)'
505 else:
506 message += f' ({test.expected_count} subtests)'
507 stdout.print_with_timestamp(format_test_divider(message, len(message)))
508
509def print_log(log: Iterable[str]) -> None:
510 """Prints all strings in saved log for test in yellow."""
511 for m in log:
512 stdout.print_with_timestamp(stdout.yellow(m))
513
514def format_test_result(test: Test) -> str:
515 """
516 Returns string with formatted test result with colored status and test
517 name.
518
519 Example:
520 '[PASSED] example'
521
522 Parameters:
523 test - Test object representing current test being printed
524
525 Return:
526 String containing formatted test result
527 """
528 if test.status == TestStatus.SUCCESS:
529 return stdout.green('[PASSED] ') + test.name
530 if test.status == TestStatus.SKIPPED:
531 return stdout.yellow('[SKIPPED] ') + test.name
532 if test.status == TestStatus.NO_TESTS:
533 return stdout.yellow('[NO TESTS RUN] ') + test.name
534 if test.status == TestStatus.TEST_CRASHED:
535 print_log(test.log)
536 return stdout.red('[CRASHED] ') + test.name
537 print_log(test.log)
538 return stdout.red('[FAILED] ') + test.name
539
540def print_test_result(test: Test) -> None:
541 """
542 Prints result line with status of test.
543
544 Example:
545 '[PASSED] example'
546
547 Parameters:
548 test - Test object representing current test being printed
549 """
550 stdout.print_with_timestamp(format_test_result(test))
551
552def print_test_footer(test: Test) -> None:
553 """
554 Prints test footer with status of test.
555
556 Example:
557 '===================== [PASSED] example ====================='
558
559 Parameters:
560 test - Test object representing current test being printed
561 """
562 message = format_test_result(test)
563 stdout.print_with_timestamp(format_test_divider(message,
564 len(message) - stdout.color_len()))
565
566def print_summary_line(test: Test) -> None:
567 """
568 Prints summary line of test object. Color of line is dependent on
569 status of test. Color is green if test passes, yellow if test is
570 skipped, and red if the test fails or crashes. Summary line contains
571 counts of the statuses of the tests subtests or the test itself if it
572 has no subtests.
573
574 Example:
575 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
576 Errors: 0"
577
578 test - Test object representing current test being printed
579 """
580 if test.status == TestStatus.SUCCESS:
581 color = stdout.green
582 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
583 color = stdout.yellow
584 else:
585 color = stdout.red
586 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
587
588# Other methods:
589
590def bubble_up_test_results(test: Test) -> None:
591 """
592 If the test has subtests, add the test counts of the subtests to the
593 test and check if any of the tests crashed and if so set the test
594 status to crashed. Otherwise if the test has no subtests add the
595 status of the test to the test counts.
596
597 Parameters:
598 test - Test object for current test being parsed
599 """
600 subtests = test.subtests
601 counts = test.counts
602 status = test.status
603 for t in subtests:
604 counts.add_subtest_counts(t.counts)
605 if counts.total() == 0:
606 counts.add_status(status)
607 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
608 test.status = TestStatus.TEST_CRASHED
609
610def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
611 """
612 Finds next test to parse in LineStream, creates new Test object,
613 parses any subtests of the test, populates Test object with all
614 information (status, name) about the test and the Test objects for
615 any subtests, and then returns the Test object. The method accepts
616 three formats of tests:
617
618 Accepted test formats:
619
620 - Main KTAP/TAP header
621
622 Example:
623
624 KTAP version 1
625 1..4
626 [subtests]
627
628 - Subtest header line
629
630 Example:
631
632 # Subtest: name
633 1..3
634 [subtests]
635 ok 1 name
636
637 - Test result line
638
639 Example:
640
641 ok 1 - test
642
643 Parameters:
644 lines - LineStream of KTAP output to parse
645 expected_num - expected test number for test to be parsed
646 log - list of strings containing any preceding diagnostic lines
647 corresponding to the current test
648
649 Return:
650 Test object populated with characteristics and any subtests
651 """
652 test = Test()
653 test.log.extend(log)
654 parent_test = False
655 main = parse_ktap_header(lines, test)
656 if main:
657 # If KTAP/TAP header is found, attempt to parse
658 # test plan
659 test.name = "main"
660 parse_test_plan(lines, test)
661 parent_test = True
662 else:
663 # If KTAP/TAP header is not found, test must be subtest
664 # header or test result line so parse attempt to parser
665 # subtest header
666 parent_test = parse_test_header(lines, test)
667 if parent_test:
668 # If subtest header is found, attempt to parse
669 # test plan and print header
670 parse_test_plan(lines, test)
671 print_test_header(test)
672 expected_count = test.expected_count
673 subtests = []
674 test_num = 1
675 while parent_test and (expected_count is None or test_num <= expected_count):
676 # Loop to parse any subtests.
677 # Break after parsing expected number of tests or
678 # if expected number of tests is unknown break when test
679 # result line with matching name to subtest header is found
680 # or no more lines in stream.
681 sub_log = parse_diagnostic(lines)
682 sub_test = Test()
683 if not lines or (peek_test_name_match(lines, test) and
684 not main):
685 if expected_count and test_num <= expected_count:
686 # If parser reaches end of test before
687 # parsing expected number of subtests, print
688 # crashed subtest and record error
689 test.add_error('missing expected subtest!')
690 sub_test.log.extend(sub_log)
691 test.counts.add_status(
692 TestStatus.TEST_CRASHED)
693 print_test_result(sub_test)
694 else:
695 test.log.extend(sub_log)
696 break
697 else:
698 sub_test = parse_test(lines, test_num, sub_log)
699 subtests.append(sub_test)
700 test_num += 1
701 test.subtests = subtests
702 if not main:
703 # If not main test, look for test result line
704 test.log.extend(parse_diagnostic(lines))
705 if (parent_test and peek_test_name_match(lines, test)) or \
706 not parent_test:
707 parse_test_result(lines, test, expected_num)
708 else:
709 test.add_error('missing subtest result line!')
710
711 # Check for there being no tests
712 if parent_test and len(subtests) == 0:
713 # Don't override a bad status if this test had one reported.
714 # Assumption: no subtests means CRASHED is from Test.__init__()
715 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
716 test.status = TestStatus.NO_TESTS
717 test.add_error('0 tests run!')
718
719 # Add statuses to TestCounts attribute in Test object
720 bubble_up_test_results(test)
721 if parent_test and not main:
722 # If test has subtests and is not the main test object, print
723 # footer.
724 print_test_footer(test)
725 elif not main:
726 print_test_result(test)
727 return test
728
729def parse_run_tests(kernel_output: Iterable[str]) -> Test:
730 """
731 Using kernel output, extract KTAP lines, parse the lines for test
732 results and print condensed test results and summary line.
733
734 Parameters:
735 kernel_output - Iterable object contains lines of kernel output
736
737 Return:
738 Test - the main test object with all subtests.
739 """
740 stdout.print_with_timestamp(DIVIDER)
741 lines = extract_tap_lines(kernel_output)
742 test = Test()
743 if not lines:
744 test.name = '<missing>'
745 test.add_error('could not find any KTAP output!')
746 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
747 else:
748 test = parse_test(lines, 0, [])
749 if test.status != TestStatus.NO_TESTS:
750 test.status = test.counts.get_status()
751 stdout.print_with_timestamp(DIVIDER)
752 print_summary_line(test)
753 return test