Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35 """Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39 """Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43 """An abstraction over command line operations performed on a source tree."""
44
45 def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46 self._linux_arch = linux_arch
47 self._cross_compile = cross_compile
48
49 def make_mrproper(self) -> None:
50 try:
51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52 except OSError as e:
53 raise ConfigError('Could not call make command: ' + str(e))
54 except subprocess.CalledProcessError as e:
55 raise ConfigError(e.output.decode())
56
57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58 return base_kunitconfig
59
60 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62 if self._cross_compile:
63 command += ['CROSS_COMPILE=' + self._cross_compile]
64 if make_options:
65 command.extend(make_options)
66 print('Populating config with:\n$', ' '.join(command))
67 try:
68 subprocess.check_output(command, stderr=subprocess.STDOUT)
69 except OSError as e:
70 raise ConfigError('Could not call make command: ' + str(e))
71 except subprocess.CalledProcessError as e:
72 raise ConfigError(e.output.decode())
73
74 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75 command = ['make', 'all', 'compile_commands.json', 'ARCH=' + self._linux_arch,
76 'O=' + build_dir, '--jobs=' + str(jobs)]
77 if make_options:
78 command.extend(make_options)
79 if self._cross_compile:
80 command += ['CROSS_COMPILE=' + self._cross_compile]
81 print('Building with:\n$', ' '.join(command))
82 try:
83 proc = subprocess.Popen(command,
84 stderr=subprocess.PIPE,
85 stdout=subprocess.DEVNULL)
86 except OSError as e:
87 raise BuildError('Could not call execute make: ' + str(e))
88 except subprocess.CalledProcessError as e:
89 raise BuildError(e.output)
90 _, stderr = proc.communicate()
91 if proc.returncode != 0:
92 raise BuildError(stderr.decode())
93 if stderr: # likely only due to build warnings
94 print(stderr.decode())
95
96 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
97 raise RuntimeError('not implemented!')
98
99
100class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
101
102 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
103 super().__init__(linux_arch=qemu_arch_params.linux_arch,
104 cross_compile=cross_compile)
105 self._kconfig = qemu_arch_params.kconfig
106 self._qemu_arch = qemu_arch_params.qemu_arch
107 self._kernel_path = qemu_arch_params.kernel_path
108 self._kernel_command_line = qemu_arch_params.kernel_command_line
109 if 'kunit_shutdown=' not in self._kernel_command_line:
110 self._kernel_command_line += ' kunit_shutdown=reboot'
111 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
112 self._serial = qemu_arch_params.serial
113
114 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
115 kconfig = kunit_config.parse_from_string(self._kconfig)
116 kconfig.merge_in_entries(base_kunitconfig)
117 return kconfig
118
119 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
120 kernel_path = os.path.join(build_dir, self._kernel_path)
121 qemu_command = ['qemu-system-' + self._qemu_arch,
122 '-nodefaults',
123 '-m', '1024',
124 '-kernel', kernel_path,
125 '-append', ' '.join(params + [self._kernel_command_line]),
126 '-no-reboot',
127 '-nographic',
128 '-accel', 'kvm',
129 '-accel', 'hvf',
130 '-accel', 'tcg',
131 '-serial', self._serial] + self._extra_qemu_params
132 # Note: shlex.join() does what we want, but requires python 3.8+.
133 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
134 return subprocess.Popen(qemu_command,
135 stdin=subprocess.PIPE,
136 stdout=subprocess.PIPE,
137 stderr=subprocess.STDOUT,
138 text=True, errors='backslashreplace')
139
140class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
141 """An abstraction over command line operations performed on a source tree."""
142
143 def __init__(self, cross_compile: Optional[str]=None):
144 super().__init__(linux_arch='um', cross_compile=cross_compile)
145
146 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
147 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
148 kconfig.merge_in_entries(base_kunitconfig)
149 return kconfig
150
151 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
152 """Runs the Linux UML binary. Must be named 'linux'."""
153 linux_bin = os.path.join(build_dir, 'linux')
154 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
155 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
156 return subprocess.Popen([linux_bin] + params,
157 stdin=subprocess.PIPE,
158 stdout=subprocess.PIPE,
159 stderr=subprocess.STDOUT,
160 text=True, errors='backslashreplace')
161
162def get_kconfig_path(build_dir: str) -> str:
163 return os.path.join(build_dir, KCONFIG_PATH)
164
165def get_kunitconfig_path(build_dir: str) -> str:
166 return os.path.join(build_dir, KUNITCONFIG_PATH)
167
168def get_old_kunitconfig_path(build_dir: str) -> str:
169 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
170
171def get_parsed_kunitconfig(build_dir: str,
172 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
173 if not kunitconfig_paths:
174 path = get_kunitconfig_path(build_dir)
175 if not os.path.exists(path):
176 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
177 return kunit_config.parse_file(path)
178
179 merged = kunit_config.Kconfig()
180
181 for path in kunitconfig_paths:
182 if os.path.isdir(path):
183 path = os.path.join(path, KUNITCONFIG_PATH)
184 if not os.path.exists(path):
185 raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
186
187 partial = kunit_config.parse_file(path)
188 diff = merged.conflicting_options(partial)
189 if diff:
190 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff)
191 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
192 merged.merge_in_entries(partial)
193 return merged
194
195def get_outfile_path(build_dir: str) -> str:
196 return os.path.join(build_dir, OUTFILE_PATH)
197
198def _default_qemu_config_path(arch: str) -> str:
199 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
200 if os.path.isfile(config_path):
201 return config_path
202
203 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
204 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
205
206def _get_qemu_ops(config_path: str,
207 extra_qemu_args: Optional[List[str]],
208 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
209 # The module name/path has very little to do with where the actual file
210 # exists (I learned this through experimentation and could not find it
211 # anywhere in the Python documentation).
212 #
213 # Bascially, we completely ignore the actual file location of the config
214 # we are loading and just tell Python that the module lives in the
215 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
216 # exists as a file.
217 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
218 spec = importlib.util.spec_from_file_location(module_path, config_path)
219 assert spec is not None
220 config = importlib.util.module_from_spec(spec)
221 # See https://github.com/python/typeshed/pull/2626 for context.
222 assert isinstance(spec.loader, importlib.abc.Loader)
223 spec.loader.exec_module(config)
224
225 if not hasattr(config, 'QEMU_ARCH'):
226 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
227 params: qemu_config.QemuArchParams = config.QEMU_ARCH
228 if extra_qemu_args:
229 params.extra_qemu_params.extend(extra_qemu_args)
230 return params.linux_arch, LinuxSourceTreeOperationsQemu(
231 params, cross_compile=cross_compile)
232
233class LinuxSourceTree:
234 """Represents a Linux kernel source tree with KUnit tests."""
235
236 def __init__(
237 self,
238 build_dir: str,
239 kunitconfig_paths: Optional[List[str]]=None,
240 kconfig_add: Optional[List[str]]=None,
241 arch: Optional[str]=None,
242 cross_compile: Optional[str]=None,
243 qemu_config_path: Optional[str]=None,
244 extra_qemu_args: Optional[List[str]]=None) -> None:
245 signal.signal(signal.SIGINT, self.signal_handler)
246 if qemu_config_path:
247 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
248 else:
249 self._arch = 'um' if arch is None else arch
250 if self._arch == 'um':
251 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
252 else:
253 qemu_config_path = _default_qemu_config_path(self._arch)
254 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
255
256 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
257 if kconfig_add:
258 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
259 self._kconfig.merge_in_entries(kconfig)
260
261 def arch(self) -> str:
262 return self._arch
263
264 def clean(self) -> bool:
265 try:
266 self._ops.make_mrproper()
267 except ConfigError as e:
268 logging.error(e)
269 return False
270 return True
271
272 def validate_config(self, build_dir: str) -> bool:
273 kconfig_path = get_kconfig_path(build_dir)
274 validated_kconfig = kunit_config.parse_file(kconfig_path)
275 if self._kconfig.is_subset_of(validated_kconfig):
276 return True
277 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
278 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
279 'This is probably due to unsatisfied dependencies.\n' \
280 'Missing: ' + ', '.join(str(e) for e in missing)
281 if self._arch == 'um':
282 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
283 'on a different architecture with something like "--arch=x86_64".'
284 logging.error(message)
285 return False
286
287 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
288 kconfig_path = get_kconfig_path(build_dir)
289 if build_dir and not os.path.exists(build_dir):
290 os.mkdir(build_dir)
291 try:
292 self._kconfig = self._ops.make_arch_config(self._kconfig)
293 self._kconfig.write_to_file(kconfig_path)
294 self._ops.make_olddefconfig(build_dir, make_options)
295 except ConfigError as e:
296 logging.error(e)
297 return False
298 if not self.validate_config(build_dir):
299 return False
300
301 old_path = get_old_kunitconfig_path(build_dir)
302 if os.path.exists(old_path):
303 os.remove(old_path) # write_to_file appends to the file
304 self._kconfig.write_to_file(old_path)
305 return True
306
307 def _kunitconfig_changed(self, build_dir: str) -> bool:
308 old_path = get_old_kunitconfig_path(build_dir)
309 if not os.path.exists(old_path):
310 return True
311
312 old_kconfig = kunit_config.parse_file(old_path)
313 return old_kconfig != self._kconfig
314
315 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
316 """Creates a new .config if it is not a subset of the .kunitconfig."""
317 kconfig_path = get_kconfig_path(build_dir)
318 if not os.path.exists(kconfig_path):
319 print('Generating .config ...')
320 return self.build_config(build_dir, make_options)
321
322 existing_kconfig = kunit_config.parse_file(kconfig_path)
323 self._kconfig = self._ops.make_arch_config(self._kconfig)
324
325 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
326 return True
327 print('Regenerating .config ...')
328 os.remove(kconfig_path)
329 return self.build_config(build_dir, make_options)
330
331 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
332 try:
333 self._ops.make_olddefconfig(build_dir, make_options)
334 self._ops.make(jobs, build_dir, make_options)
335 except (ConfigError, BuildError) as e:
336 logging.error(e)
337 return False
338 return self.validate_config(build_dir)
339
340 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
341 if not args:
342 args = []
343 if filter_glob:
344 args.append('kunit.filter_glob=' + filter_glob)
345 if filter:
346 args.append('kunit.filter="' + filter + '"')
347 if filter_action:
348 args.append('kunit.filter_action=' + filter_action)
349 args.append('kunit.enable=1')
350
351 process = self._ops.start(args, build_dir)
352 assert process.stdout is not None # tell mypy it's set
353
354 # Enforce the timeout in a background thread.
355 def _wait_proc() -> None:
356 try:
357 process.wait(timeout=timeout)
358 except Exception as e:
359 print(e)
360 process.terminate()
361 process.wait()
362 waiter = threading.Thread(target=_wait_proc)
363 waiter.start()
364
365 output = open(get_outfile_path(build_dir), 'w')
366 try:
367 # Tee the output to the file and to our caller in real time.
368 for line in process.stdout:
369 output.write(line)
370 yield line
371 # This runs even if our caller doesn't consume every line.
372 finally:
373 # Flush any leftover output to the file
374 output.write(process.stdout.read())
375 output.close()
376 process.stdout.close()
377
378 waiter.join()
379 subprocess.call(['stty', 'sane'])
380
381 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
382 logging.error('Build interruption occurred. Cleaning console.')
383 subprocess.call(['stty', 'sane'])