Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses test results from a kernel dmesg log.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import re
10
11from collections import namedtuple
12from datetime import datetime
13from enum import Enum, auto
14from functools import reduce
15from typing import List
16
17TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19class TestSuite(object):
20 def __init__(self):
21 self.status = None
22 self.name = None
23 self.cases = []
24
25 def __str__(self):
26 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
27
28 def __repr__(self):
29 return str(self)
30
31class TestCase(object):
32 def __init__(self):
33 self.status = None
34 self.name = ''
35 self.log = []
36
37 def __str__(self):
38 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
39
40 def __repr__(self):
41 return str(self)
42
43class TestStatus(Enum):
44 SUCCESS = auto()
45 FAILURE = auto()
46 TEST_CRASHED = auto()
47 NO_TESTS = auto()
48 FAILURE_TO_PARSE_TESTS = auto()
49
50kunit_start_re = re.compile(r'TAP version [0-9]+$')
51kunit_end_re = re.compile('(List of all partitions:|'
52 'Kernel panic - not syncing: VFS:)')
53
54def isolate_kunit_output(kernel_output):
55 started = False
56 for line in kernel_output:
57 if kunit_start_re.search(line):
58 prefix_len = len(line.split('TAP version')[0])
59 started = True
60 yield line[prefix_len:] if prefix_len > 0 else line
61 elif kunit_end_re.search(line):
62 break
63 elif started:
64 yield line[prefix_len:] if prefix_len > 0 else line
65
66def raw_output(kernel_output):
67 for line in kernel_output:
68 print(line)
69
70DIVIDER = '=' * 60
71
72RESET = '\033[0;0m'
73
74def red(text):
75 return '\033[1;31m' + text + RESET
76
77def yellow(text):
78 return '\033[1;33m' + text + RESET
79
80def green(text):
81 return '\033[1;32m' + text + RESET
82
83def print_with_timestamp(message):
84 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
85
86def format_suite_divider(message):
87 return '======== ' + message + ' ========'
88
89def print_suite_divider(message):
90 print_with_timestamp(DIVIDER)
91 print_with_timestamp(format_suite_divider(message))
92
93def print_log(log):
94 for m in log:
95 print_with_timestamp(m)
96
97TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
98
99def consume_non_diagnositic(lines: List[str]) -> None:
100 while lines and not TAP_ENTRIES.match(lines[0]):
101 lines.pop(0)
102
103def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
104 while lines and not TAP_ENTRIES.match(lines[0]):
105 test_case.log.append(lines[0])
106 lines.pop(0)
107
108OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
109
110OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
111
112OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
113
114def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
115 save_non_diagnositic(lines, test_case)
116 if not lines:
117 test_case.status = TestStatus.TEST_CRASHED
118 return True
119 line = lines[0]
120 match = OK_NOT_OK_SUBTEST.match(line)
121 while not match and lines:
122 line = lines.pop(0)
123 match = OK_NOT_OK_SUBTEST.match(line)
124 if match:
125 test_case.log.append(lines.pop(0))
126 test_case.name = match.group(2)
127 if test_case.status == TestStatus.TEST_CRASHED:
128 return True
129 if match.group(1) == 'ok':
130 test_case.status = TestStatus.SUCCESS
131 else:
132 test_case.status = TestStatus.FAILURE
133 return True
134 else:
135 return False
136
137SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# .*?: (.*)$')
138DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!'
139
140def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
141 save_non_diagnositic(lines, test_case)
142 if not lines:
143 return False
144 line = lines[0]
145 match = SUBTEST_DIAGNOSTIC.match(line)
146 if match:
147 test_case.log.append(lines.pop(0))
148 if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE:
149 test_case.status = TestStatus.TEST_CRASHED
150 return True
151 else:
152 return False
153
154def parse_test_case(lines: List[str]) -> TestCase:
155 test_case = TestCase()
156 save_non_diagnositic(lines, test_case)
157 while parse_diagnostic(lines, test_case):
158 pass
159 if parse_ok_not_ok_test_case(lines, test_case):
160 return test_case
161 else:
162 return None
163
164SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
165
166def parse_subtest_header(lines: List[str]) -> str:
167 consume_non_diagnositic(lines)
168 if not lines:
169 return None
170 match = SUBTEST_HEADER.match(lines[0])
171 if match:
172 lines.pop(0)
173 return match.group(1)
174 else:
175 return None
176
177SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
178
179def parse_subtest_plan(lines: List[str]) -> int:
180 consume_non_diagnositic(lines)
181 match = SUBTEST_PLAN.match(lines[0])
182 if match:
183 lines.pop(0)
184 return int(match.group(1))
185 else:
186 return None
187
188def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
189 if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
190 return TestStatus.TEST_CRASHED
191 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
192 return TestStatus.FAILURE
193 elif left != TestStatus.SUCCESS:
194 return left
195 elif right != TestStatus.SUCCESS:
196 return right
197 else:
198 return TestStatus.SUCCESS
199
200def parse_ok_not_ok_test_suite(lines: List[str],
201 test_suite: TestSuite,
202 expected_suite_index: int) -> bool:
203 consume_non_diagnositic(lines)
204 if not lines:
205 test_suite.status = TestStatus.TEST_CRASHED
206 return False
207 line = lines[0]
208 match = OK_NOT_OK_MODULE.match(line)
209 if match:
210 lines.pop(0)
211 if match.group(1) == 'ok':
212 test_suite.status = TestStatus.SUCCESS
213 else:
214 test_suite.status = TestStatus.FAILURE
215 suite_index = int(match.group(2))
216 if suite_index != expected_suite_index:
217 print_with_timestamp(
218 red('[ERROR] ') + 'expected_suite_index ' +
219 str(expected_suite_index) + ', but got ' +
220 str(suite_index))
221 return True
222 else:
223 return False
224
225def bubble_up_errors(to_status, status_container_list) -> TestStatus:
226 status_list = map(to_status, status_container_list)
227 return reduce(max_status, status_list, TestStatus.SUCCESS)
228
229def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
230 max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
231 return max_status(max_test_case_status, test_suite.status)
232
233def parse_test_suite(lines: List[str], expected_suite_index: int) -> TestSuite:
234 if not lines:
235 return None
236 consume_non_diagnositic(lines)
237 test_suite = TestSuite()
238 test_suite.status = TestStatus.SUCCESS
239 name = parse_subtest_header(lines)
240 if not name:
241 return None
242 test_suite.name = name
243 expected_test_case_num = parse_subtest_plan(lines)
244 if expected_test_case_num is None:
245 return None
246 while expected_test_case_num > 0:
247 test_case = parse_test_case(lines)
248 if not test_case:
249 break
250 test_suite.cases.append(test_case)
251 expected_test_case_num -= 1
252 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
253 test_suite.status = bubble_up_test_case_errors(test_suite)
254 return test_suite
255 elif not lines:
256 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
257 return test_suite
258 else:
259 print('failed to parse end of suite' + lines[0])
260 return None
261
262TAP_HEADER = re.compile(r'^TAP version 14$')
263
264def parse_tap_header(lines: List[str]) -> bool:
265 consume_non_diagnositic(lines)
266 if TAP_HEADER.match(lines[0]):
267 lines.pop(0)
268 return True
269 else:
270 return False
271
272TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
273
274def parse_test_plan(lines: List[str]) -> int:
275 consume_non_diagnositic(lines)
276 match = TEST_PLAN.match(lines[0])
277 if match:
278 lines.pop(0)
279 return int(match.group(1))
280 else:
281 return None
282
283def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
284 return bubble_up_errors(lambda x: x.status, test_suite_list)
285
286def parse_test_result(lines: List[str]) -> TestResult:
287 consume_non_diagnositic(lines)
288 if not lines or not parse_tap_header(lines):
289 return TestResult(TestStatus.NO_TESTS, [], lines)
290 expected_test_suite_num = parse_test_plan(lines)
291 if not expected_test_suite_num:
292 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
293 test_suites = []
294 for i in range(1, expected_test_suite_num + 1):
295 test_suite = parse_test_suite(lines, i)
296 if test_suite:
297 test_suites.append(test_suite)
298 else:
299 print_with_timestamp(
300 red('[ERROR] ') + ' expected ' +
301 str(expected_test_suite_num) +
302 ' test suites, but got ' + str(i - 2))
303 break
304 test_suite = parse_test_suite(lines, -1)
305 if test_suite:
306 print_with_timestamp(red('[ERROR] ') +
307 'got unexpected test suite: ' + test_suite.name)
308 if test_suites:
309 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
310 else:
311 return TestResult(TestStatus.NO_TESTS, [], lines)
312
313def print_and_count_results(test_result: TestResult) -> None:
314 total_tests = 0
315 failed_tests = 0
316 crashed_tests = 0
317 for test_suite in test_result.suites:
318 if test_suite.status == TestStatus.SUCCESS:
319 print_suite_divider(green('[PASSED] ') + test_suite.name)
320 elif test_suite.status == TestStatus.TEST_CRASHED:
321 print_suite_divider(red('[CRASHED] ' + test_suite.name))
322 else:
323 print_suite_divider(red('[FAILED] ') + test_suite.name)
324 for test_case in test_suite.cases:
325 total_tests += 1
326 if test_case.status == TestStatus.SUCCESS:
327 print_with_timestamp(green('[PASSED] ') + test_case.name)
328 elif test_case.status == TestStatus.TEST_CRASHED:
329 crashed_tests += 1
330 print_with_timestamp(red('[CRASHED] ' + test_case.name))
331 print_log(map(yellow, test_case.log))
332 print_with_timestamp('')
333 else:
334 failed_tests += 1
335 print_with_timestamp(red('[FAILED] ') + test_case.name)
336 print_log(map(yellow, test_case.log))
337 print_with_timestamp('')
338 return total_tests, failed_tests, crashed_tests
339
340def parse_run_tests(kernel_output) -> TestResult:
341 total_tests = 0
342 failed_tests = 0
343 crashed_tests = 0
344 test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
345 if test_result.status == TestStatus.NO_TESTS:
346 print(red('[ERROR] ') + yellow('no tests run!'))
347 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
348 print(red('[ERROR] ') + yellow('could not parse test results!'))
349 else:
350 (total_tests,
351 failed_tests,
352 crashed_tests) = print_and_count_results(test_result)
353 print_with_timestamp(DIVIDER)
354 fmt = green if test_result.status == TestStatus.SUCCESS else red
355 print_with_timestamp(
356 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
357 (total_tests, failed_tests, crashed_tests)))
358 return test_result