Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import sys
18import threading
19from typing import Iterator, List, Optional, Tuple
20from types import FrameType
21
22import kunit_config
23import qemu_config
24
25KCONFIG_PATH = '.config'
26KUNITCONFIG_PATH = '.kunitconfig'
27OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
28DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
29ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
30UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
31OUTFILE_PATH = 'test.log'
32ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
33QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
34
35class ConfigError(Exception):
36 """Represents an error trying to configure the Linux kernel."""
37
38
39class BuildError(Exception):
40 """Represents an error trying to build the Linux kernel."""
41
42
43class LinuxSourceTreeOperations:
44 """An abstraction over command line operations performed on a source tree."""
45
46 def __init__(self, linux_arch: str, cross_compile: Optional[str]):
47 self._linux_arch = linux_arch
48 self._cross_compile = cross_compile
49
50 def make_mrproper(self) -> None:
51 try:
52 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
53 except OSError as e:
54 raise ConfigError('Could not call make command: ' + str(e))
55 except subprocess.CalledProcessError as e:
56 raise ConfigError(e.output.decode())
57
58 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
59 return base_kunitconfig
60
61 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
62 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
63 if self._cross_compile:
64 command += ['CROSS_COMPILE=' + self._cross_compile]
65 if make_options:
66 command.extend(make_options)
67 print('Populating config with:\n$', ' '.join(command))
68 try:
69 subprocess.check_output(command, stderr=subprocess.STDOUT)
70 except OSError as e:
71 raise ConfigError('Could not call make command: ' + str(e))
72 except subprocess.CalledProcessError as e:
73 raise ConfigError(e.output.decode())
74
75 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
76 command = ['make', 'all', 'compile_commands.json', 'scripts_gdb',
77 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
78 if make_options:
79 command.extend(make_options)
80 if self._cross_compile:
81 command += ['CROSS_COMPILE=' + self._cross_compile]
82 print('Building with:\n$', ' '.join(command))
83 try:
84 proc = subprocess.Popen(command,
85 stderr=subprocess.PIPE,
86 stdout=subprocess.DEVNULL)
87 except OSError as e:
88 raise BuildError('Could not call execute make: ' + str(e))
89 except subprocess.CalledProcessError as e:
90 raise BuildError(e.output)
91 _, stderr = proc.communicate()
92 if proc.returncode != 0:
93 raise BuildError(stderr.decode())
94 if stderr: # likely only due to build warnings
95 print(stderr.decode())
96
97 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
98 raise RuntimeError('not implemented!')
99
100
101class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
102
103 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
104 super().__init__(linux_arch=qemu_arch_params.linux_arch,
105 cross_compile=cross_compile)
106 self._kconfig = qemu_arch_params.kconfig
107 self._qemu_arch = qemu_arch_params.qemu_arch
108 self._kernel_path = qemu_arch_params.kernel_path
109 self._kernel_command_line = qemu_arch_params.kernel_command_line
110 if 'kunit_shutdown=' not in self._kernel_command_line:
111 self._kernel_command_line += ' kunit_shutdown=reboot'
112 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
113 self._serial = qemu_arch_params.serial
114
115 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
116 kconfig = kunit_config.parse_from_string(self._kconfig)
117 kconfig.merge_in_entries(base_kunitconfig)
118 return kconfig
119
120 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
121 kernel_path = os.path.join(build_dir, self._kernel_path)
122 qemu_command = ['qemu-system-' + self._qemu_arch,
123 '-nodefaults',
124 '-m', '1024',
125 '-kernel', kernel_path,
126 '-append', ' '.join(params + [self._kernel_command_line]),
127 '-no-reboot',
128 '-nographic',
129 '-accel', 'kvm',
130 '-accel', 'hvf',
131 '-accel', 'tcg',
132 '-serial', self._serial] + self._extra_qemu_params
133 # Note: shlex.join() does what we want, but requires python 3.8+.
134 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
135 return subprocess.Popen(qemu_command,
136 stdin=subprocess.PIPE,
137 stdout=subprocess.PIPE,
138 stderr=subprocess.STDOUT,
139 text=True, errors='backslashreplace')
140
141class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
142 """An abstraction over command line operations performed on a source tree."""
143
144 def __init__(self, cross_compile: Optional[str]=None):
145 super().__init__(linux_arch='um', cross_compile=cross_compile)
146
147 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
148 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
149 kconfig.merge_in_entries(base_kunitconfig)
150 return kconfig
151
152 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
153 """Runs the Linux UML binary. Must be named 'linux'."""
154 linux_bin = os.path.join(build_dir, 'linux')
155 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
156 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
157 return subprocess.Popen([linux_bin] + params,
158 stdin=subprocess.PIPE,
159 stdout=subprocess.PIPE,
160 stderr=subprocess.STDOUT,
161 text=True, errors='backslashreplace')
162
163def get_kconfig_path(build_dir: str) -> str:
164 return os.path.join(build_dir, KCONFIG_PATH)
165
166def get_kunitconfig_path(build_dir: str) -> str:
167 return os.path.join(build_dir, KUNITCONFIG_PATH)
168
169def get_old_kunitconfig_path(build_dir: str) -> str:
170 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
171
172def get_parsed_kunitconfig(build_dir: str,
173 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
174 if not kunitconfig_paths:
175 path = get_kunitconfig_path(build_dir)
176 if not os.path.exists(path):
177 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
178 return kunit_config.parse_file(path)
179
180 merged = kunit_config.Kconfig()
181
182 for path in kunitconfig_paths:
183 if os.path.isdir(path):
184 path = os.path.join(path, KUNITCONFIG_PATH)
185 if not os.path.exists(path):
186 raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
187
188 partial = kunit_config.parse_file(path)
189 diff = merged.conflicting_options(partial)
190 if diff:
191 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff)
192 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
193 merged.merge_in_entries(partial)
194 return merged
195
196def get_outfile_path(build_dir: str) -> str:
197 return os.path.join(build_dir, OUTFILE_PATH)
198
199def _default_qemu_config_path(arch: str) -> str:
200 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
201 if os.path.isfile(config_path):
202 return config_path
203
204 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
205
206 if arch == 'help':
207 print('um')
208 for option in options:
209 print(option)
210 sys.exit()
211
212 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
213
214def _get_qemu_ops(config_path: str,
215 extra_qemu_args: Optional[List[str]],
216 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
217 # The module name/path has very little to do with where the actual file
218 # exists (I learned this through experimentation and could not find it
219 # anywhere in the Python documentation).
220 #
221 # Bascially, we completely ignore the actual file location of the config
222 # we are loading and just tell Python that the module lives in the
223 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
224 # exists as a file.
225 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
226 spec = importlib.util.spec_from_file_location(module_path, config_path)
227 assert spec is not None
228 config = importlib.util.module_from_spec(spec)
229 # See https://github.com/python/typeshed/pull/2626 for context.
230 assert isinstance(spec.loader, importlib.abc.Loader)
231 spec.loader.exec_module(config)
232
233 if not hasattr(config, 'QEMU_ARCH'):
234 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
235 params: qemu_config.QemuArchParams = config.QEMU_ARCH
236 if extra_qemu_args:
237 params.extra_qemu_params.extend(extra_qemu_args)
238 return params.linux_arch, LinuxSourceTreeOperationsQemu(
239 params, cross_compile=cross_compile)
240
241class LinuxSourceTree:
242 """Represents a Linux kernel source tree with KUnit tests."""
243
244 def __init__(
245 self,
246 build_dir: str,
247 kunitconfig_paths: Optional[List[str]]=None,
248 kconfig_add: Optional[List[str]]=None,
249 arch: Optional[str]=None,
250 cross_compile: Optional[str]=None,
251 qemu_config_path: Optional[str]=None,
252 extra_qemu_args: Optional[List[str]]=None) -> None:
253 signal.signal(signal.SIGINT, self.signal_handler)
254 if qemu_config_path:
255 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
256 else:
257 self._arch = 'um' if arch is None else arch
258 if self._arch == 'um':
259 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
260 else:
261 qemu_config_path = _default_qemu_config_path(self._arch)
262 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
263
264 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
265 if kconfig_add:
266 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
267 self._kconfig.merge_in_entries(kconfig)
268
269 def arch(self) -> str:
270 return self._arch
271
272 def clean(self) -> bool:
273 try:
274 self._ops.make_mrproper()
275 except ConfigError as e:
276 logging.error(e)
277 return False
278 return True
279
280 def validate_config(self, build_dir: str) -> bool:
281 kconfig_path = get_kconfig_path(build_dir)
282 validated_kconfig = kunit_config.parse_file(kconfig_path)
283 if self._kconfig.is_subset_of(validated_kconfig):
284 return True
285 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
286 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
287 'This is probably due to unsatisfied dependencies.\n' \
288 'Missing: ' + ', '.join(str(e) for e in missing)
289 if self._arch == 'um':
290 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
291 'on a different architecture with something like "--arch=x86_64".'
292 logging.error(message)
293 return False
294
295 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
296 kconfig_path = get_kconfig_path(build_dir)
297 if build_dir and not os.path.exists(build_dir):
298 os.mkdir(build_dir)
299 try:
300 self._kconfig = self._ops.make_arch_config(self._kconfig)
301 self._kconfig.write_to_file(kconfig_path)
302 self._ops.make_olddefconfig(build_dir, make_options)
303 except ConfigError as e:
304 logging.error(e)
305 return False
306 if not self.validate_config(build_dir):
307 return False
308
309 old_path = get_old_kunitconfig_path(build_dir)
310 if os.path.exists(old_path):
311 os.remove(old_path) # write_to_file appends to the file
312 self._kconfig.write_to_file(old_path)
313 return True
314
315 def _kunitconfig_changed(self, build_dir: str) -> bool:
316 old_path = get_old_kunitconfig_path(build_dir)
317 if not os.path.exists(old_path):
318 return True
319
320 old_kconfig = kunit_config.parse_file(old_path)
321 return old_kconfig != self._kconfig
322
323 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
324 """Creates a new .config if it is not a subset of the .kunitconfig."""
325 kconfig_path = get_kconfig_path(build_dir)
326 if not os.path.exists(kconfig_path):
327 print('Generating .config ...')
328 return self.build_config(build_dir, make_options)
329
330 existing_kconfig = kunit_config.parse_file(kconfig_path)
331 self._kconfig = self._ops.make_arch_config(self._kconfig)
332
333 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
334 return True
335 print('Regenerating .config ...')
336 os.remove(kconfig_path)
337 return self.build_config(build_dir, make_options)
338
339 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
340 try:
341 self._ops.make_olddefconfig(build_dir, make_options)
342 self._ops.make(jobs, build_dir, make_options)
343 except (ConfigError, BuildError) as e:
344 logging.error(e)
345 return False
346 return self.validate_config(build_dir)
347
348 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
349 if not args:
350 args = []
351 if filter_glob:
352 args.append('kunit.filter_glob=' + filter_glob)
353 if filter:
354 args.append('kunit.filter="' + filter + '"')
355 if filter_action:
356 args.append('kunit.filter_action=' + filter_action)
357 args.append('kunit.enable=1')
358
359 process = self._ops.start(args, build_dir)
360 assert process.stdout is not None # tell mypy it's set
361
362 # Enforce the timeout in a background thread.
363 def _wait_proc() -> None:
364 try:
365 process.wait(timeout=timeout)
366 except Exception as e:
367 print(e)
368 process.terminate()
369 process.wait()
370 waiter = threading.Thread(target=_wait_proc)
371 waiter.start()
372
373 output = open(get_outfile_path(build_dir), 'w')
374 try:
375 # Tee the output to the file and to our caller in real time.
376 for line in process.stdout:
377 output.write(line)
378 yield line
379 # This runs even if our caller doesn't consume every line.
380 finally:
381 # Flush any leftover output to the file
382 output.write(process.stdout.read())
383 output.close()
384 process.stdout.close()
385
386 waiter.join()
387 subprocess.call(['stty', 'sane'])
388
389 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
390 logging.error('Build interruption occurred. Cleaning console.')
391 subprocess.call(['stty', 'sane'])