Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35 """Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39 """Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43 """An abstraction over command line operations performed on a source tree."""
44
45 def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46 self._linux_arch = linux_arch
47 self._cross_compile = cross_compile
48
49 def make_mrproper(self) -> None:
50 try:
51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52 except OSError as e:
53 raise ConfigError('Could not call make command: ' + str(e))
54 except subprocess.CalledProcessError as e:
55 raise ConfigError(e.output.decode())
56
57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58 return base_kunitconfig
59
60 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62 if self._cross_compile:
63 command += ['CROSS_COMPILE=' + self._cross_compile]
64 if make_options:
65 command.extend(make_options)
66 print('Populating config with:\n$', ' '.join(command))
67 try:
68 subprocess.check_output(command, stderr=subprocess.STDOUT)
69 except OSError as e:
70 raise ConfigError('Could not call make command: ' + str(e))
71 except subprocess.CalledProcessError as e:
72 raise ConfigError(e.output.decode())
73
74 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75 command = ['make', 'all', 'compile_commands.json', 'ARCH=' + self._linux_arch,
76 'O=' + build_dir, '--jobs=' + str(jobs)]
77 if make_options:
78 command.extend(make_options)
79 if self._cross_compile:
80 command += ['CROSS_COMPILE=' + self._cross_compile]
81 print('Building with:\n$', ' '.join(command))
82 try:
83 proc = subprocess.Popen(command,
84 stderr=subprocess.PIPE,
85 stdout=subprocess.DEVNULL)
86 except OSError as e:
87 raise BuildError('Could not call execute make: ' + str(e))
88 except subprocess.CalledProcessError as e:
89 raise BuildError(e.output)
90 _, stderr = proc.communicate()
91 if proc.returncode != 0:
92 raise BuildError(stderr.decode())
93 if stderr: # likely only due to build warnings
94 print(stderr.decode())
95
96 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
97 raise RuntimeError('not implemented!')
98
99
100class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
101
102 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
103 super().__init__(linux_arch=qemu_arch_params.linux_arch,
104 cross_compile=cross_compile)
105 self._kconfig = qemu_arch_params.kconfig
106 self._qemu_arch = qemu_arch_params.qemu_arch
107 self._kernel_path = qemu_arch_params.kernel_path
108 self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
109 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
110 self._serial = qemu_arch_params.serial
111
112 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
113 kconfig = kunit_config.parse_from_string(self._kconfig)
114 kconfig.merge_in_entries(base_kunitconfig)
115 return kconfig
116
117 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
118 kernel_path = os.path.join(build_dir, self._kernel_path)
119 qemu_command = ['qemu-system-' + self._qemu_arch,
120 '-nodefaults',
121 '-m', '1024',
122 '-kernel', kernel_path,
123 '-append', ' '.join(params + [self._kernel_command_line]),
124 '-no-reboot',
125 '-nographic',
126 '-serial', self._serial] + self._extra_qemu_params
127 # Note: shlex.join() does what we want, but requires python 3.8+.
128 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
129 return subprocess.Popen(qemu_command,
130 stdin=subprocess.PIPE,
131 stdout=subprocess.PIPE,
132 stderr=subprocess.STDOUT,
133 text=True, errors='backslashreplace')
134
135class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
136 """An abstraction over command line operations performed on a source tree."""
137
138 def __init__(self, cross_compile: Optional[str]=None):
139 super().__init__(linux_arch='um', cross_compile=cross_compile)
140
141 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
142 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
143 kconfig.merge_in_entries(base_kunitconfig)
144 return kconfig
145
146 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
147 """Runs the Linux UML binary. Must be named 'linux'."""
148 linux_bin = os.path.join(build_dir, 'linux')
149 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
150 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
151 return subprocess.Popen([linux_bin] + params,
152 stdin=subprocess.PIPE,
153 stdout=subprocess.PIPE,
154 stderr=subprocess.STDOUT,
155 text=True, errors='backslashreplace')
156
157def get_kconfig_path(build_dir: str) -> str:
158 return os.path.join(build_dir, KCONFIG_PATH)
159
160def get_kunitconfig_path(build_dir: str) -> str:
161 return os.path.join(build_dir, KUNITCONFIG_PATH)
162
163def get_old_kunitconfig_path(build_dir: str) -> str:
164 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
165
166def get_parsed_kunitconfig(build_dir: str,
167 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
168 if not kunitconfig_paths:
169 path = get_kunitconfig_path(build_dir)
170 if not os.path.exists(path):
171 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
172 return kunit_config.parse_file(path)
173
174 merged = kunit_config.Kconfig()
175
176 for path in kunitconfig_paths:
177 if os.path.isdir(path):
178 path = os.path.join(path, KUNITCONFIG_PATH)
179 if not os.path.exists(path):
180 raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
181
182 partial = kunit_config.parse_file(path)
183 diff = merged.conflicting_options(partial)
184 if diff:
185 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff)
186 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
187 merged.merge_in_entries(partial)
188 return merged
189
190def get_outfile_path(build_dir: str) -> str:
191 return os.path.join(build_dir, OUTFILE_PATH)
192
193def _default_qemu_config_path(arch: str) -> str:
194 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
195 if os.path.isfile(config_path):
196 return config_path
197
198 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
199 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
200
201def _get_qemu_ops(config_path: str,
202 extra_qemu_args: Optional[List[str]],
203 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
204 # The module name/path has very little to do with where the actual file
205 # exists (I learned this through experimentation and could not find it
206 # anywhere in the Python documentation).
207 #
208 # Bascially, we completely ignore the actual file location of the config
209 # we are loading and just tell Python that the module lives in the
210 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
211 # exists as a file.
212 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
213 spec = importlib.util.spec_from_file_location(module_path, config_path)
214 assert spec is not None
215 config = importlib.util.module_from_spec(spec)
216 # See https://github.com/python/typeshed/pull/2626 for context.
217 assert isinstance(spec.loader, importlib.abc.Loader)
218 spec.loader.exec_module(config)
219
220 if not hasattr(config, 'QEMU_ARCH'):
221 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
222 params: qemu_config.QemuArchParams = config.QEMU_ARCH
223 if extra_qemu_args:
224 params.extra_qemu_params.extend(extra_qemu_args)
225 return params.linux_arch, LinuxSourceTreeOperationsQemu(
226 params, cross_compile=cross_compile)
227
228class LinuxSourceTree:
229 """Represents a Linux kernel source tree with KUnit tests."""
230
231 def __init__(
232 self,
233 build_dir: str,
234 kunitconfig_paths: Optional[List[str]]=None,
235 kconfig_add: Optional[List[str]]=None,
236 arch: Optional[str]=None,
237 cross_compile: Optional[str]=None,
238 qemu_config_path: Optional[str]=None,
239 extra_qemu_args: Optional[List[str]]=None) -> None:
240 signal.signal(signal.SIGINT, self.signal_handler)
241 if qemu_config_path:
242 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
243 else:
244 self._arch = 'um' if arch is None else arch
245 if self._arch == 'um':
246 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
247 else:
248 qemu_config_path = _default_qemu_config_path(self._arch)
249 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
250
251 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
252 if kconfig_add:
253 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
254 self._kconfig.merge_in_entries(kconfig)
255
256 def arch(self) -> str:
257 return self._arch
258
259 def clean(self) -> bool:
260 try:
261 self._ops.make_mrproper()
262 except ConfigError as e:
263 logging.error(e)
264 return False
265 return True
266
267 def validate_config(self, build_dir: str) -> bool:
268 kconfig_path = get_kconfig_path(build_dir)
269 validated_kconfig = kunit_config.parse_file(kconfig_path)
270 if self._kconfig.is_subset_of(validated_kconfig):
271 return True
272 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
273 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
274 'This is probably due to unsatisfied dependencies.\n' \
275 'Missing: ' + ', '.join(str(e) for e in missing)
276 if self._arch == 'um':
277 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
278 'on a different architecture with something like "--arch=x86_64".'
279 logging.error(message)
280 return False
281
282 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
283 kconfig_path = get_kconfig_path(build_dir)
284 if build_dir and not os.path.exists(build_dir):
285 os.mkdir(build_dir)
286 try:
287 self._kconfig = self._ops.make_arch_config(self._kconfig)
288 self._kconfig.write_to_file(kconfig_path)
289 self._ops.make_olddefconfig(build_dir, make_options)
290 except ConfigError as e:
291 logging.error(e)
292 return False
293 if not self.validate_config(build_dir):
294 return False
295
296 old_path = get_old_kunitconfig_path(build_dir)
297 if os.path.exists(old_path):
298 os.remove(old_path) # write_to_file appends to the file
299 self._kconfig.write_to_file(old_path)
300 return True
301
302 def _kunitconfig_changed(self, build_dir: str) -> bool:
303 old_path = get_old_kunitconfig_path(build_dir)
304 if not os.path.exists(old_path):
305 return True
306
307 old_kconfig = kunit_config.parse_file(old_path)
308 return old_kconfig != self._kconfig
309
310 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
311 """Creates a new .config if it is not a subset of the .kunitconfig."""
312 kconfig_path = get_kconfig_path(build_dir)
313 if not os.path.exists(kconfig_path):
314 print('Generating .config ...')
315 return self.build_config(build_dir, make_options)
316
317 existing_kconfig = kunit_config.parse_file(kconfig_path)
318 self._kconfig = self._ops.make_arch_config(self._kconfig)
319
320 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
321 return True
322 print('Regenerating .config ...')
323 os.remove(kconfig_path)
324 return self.build_config(build_dir, make_options)
325
326 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
327 try:
328 self._ops.make_olddefconfig(build_dir, make_options)
329 self._ops.make(jobs, build_dir, make_options)
330 except (ConfigError, BuildError) as e:
331 logging.error(e)
332 return False
333 return self.validate_config(build_dir)
334
335 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
336 if not args:
337 args = []
338 if filter_glob:
339 args.append('kunit.filter_glob=' + filter_glob)
340 if filter:
341 args.append('kunit.filter="' + filter + '"')
342 if filter_action:
343 args.append('kunit.filter_action=' + filter_action)
344 args.append('kunit.enable=1')
345
346 process = self._ops.start(args, build_dir)
347 assert process.stdout is not None # tell mypy it's set
348
349 # Enforce the timeout in a background thread.
350 def _wait_proc() -> None:
351 try:
352 process.wait(timeout=timeout)
353 except Exception as e:
354 print(e)
355 process.terminate()
356 process.wait()
357 waiter = threading.Thread(target=_wait_proc)
358 waiter.start()
359
360 output = open(get_outfile_path(build_dir), 'w')
361 try:
362 # Tee the output to the file and to our caller in real time.
363 for line in process.stdout:
364 output.write(line)
365 yield line
366 # This runs even if our caller doesn't consume every line.
367 finally:
368 # Flush any leftover output to the file
369 output.write(process.stdout.read())
370 output.close()
371 process.stdout.close()
372
373 waiter.join()
374 subprocess.call(['stty', 'sane'])
375
376 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
377 logging.error('Build interruption occurred. Cleaning console.')
378 subprocess.call(['stty', 'sane'])