1# SPDX-License-Identifier: GPL-2.0 2# 3# Runs UML kernel, collects output, and handles errors. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <[email protected]> 7# Author: Brendan Higgins <[email protected]> 8 9import importlib.abc 10import importlib.util 11import logging 12import subprocess 13import os 14import shlex 15import shutil 16import signal 17import threading 18from typing import Iterator, List, Optional, Tuple 19 20import kunit_config 21import qemu_config 22 23KCONFIG_PATH = '.config' 24KUNITCONFIG_PATH = '.kunitconfig' 25OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' 26DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' 27ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config' 28UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config' 29OUTFILE_PATH = 'test.log' 30ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) 31QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') 32 33class ConfigError(Exception): 34 """Represents an error trying to configure the Linux kernel.""" 35 36 37class BuildError(Exception): 38 """Represents an error trying to build the Linux kernel.""" 39 40 41class LinuxSourceTreeOperations: 42 """An abstraction over command line operations performed on a source tree.""" 43 44 def __init__(self, linux_arch: str, cross_compile: Optional[str]): 45 self._linux_arch = linux_arch 46 self._cross_compile = cross_compile 47 48 def make_mrproper(self) -> None: 49 try: 50 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) 51 except OSError as e: 52 raise ConfigError('Could not call make command: ' + str(e)) 53 except subprocess.CalledProcessError as e: 54 raise ConfigError(e.output.decode()) 55 56 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 57 return base_kunitconfig 58 59 def make_olddefconfig(self, build_dir: str, make_options) -> None: 60 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig'] 61 if self._cross_compile: 62 command += ['CROSS_COMPILE=' + self._cross_compile] 63 if make_options: 64 command.extend(make_options) 65 print('Populating config with:\n$', ' '.join(command)) 66 try: 67 subprocess.check_output(command, stderr=subprocess.STDOUT) 68 except OSError as e: 69 raise ConfigError('Could not call make command: ' + str(e)) 70 except subprocess.CalledProcessError as e: 71 raise ConfigError(e.output.decode()) 72 73 def make(self, jobs, build_dir: str, make_options) -> None: 74 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)] 75 if make_options: 76 command.extend(make_options) 77 if self._cross_compile: 78 command += ['CROSS_COMPILE=' + self._cross_compile] 79 print('Building with:\n$', ' '.join(command)) 80 try: 81 proc = subprocess.Popen(command, 82 stderr=subprocess.PIPE, 83 stdout=subprocess.DEVNULL) 84 except OSError as e: 85 raise BuildError('Could not call execute make: ' + str(e)) 86 except subprocess.CalledProcessError as e: 87 raise BuildError(e.output) 88 _, stderr = proc.communicate() 89 if proc.returncode != 0: 90 raise BuildError(stderr.decode()) 91 if stderr: # likely only due to build warnings 92 print(stderr.decode()) 93 94 def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]: 95 raise RuntimeError('not implemented!') 96 97 98class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): 99 100 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): 101 super().__init__(linux_arch=qemu_arch_params.linux_arch, 102 cross_compile=cross_compile) 103 self._kconfig = qemu_arch_params.kconfig 104 self._qemu_arch = qemu_arch_params.qemu_arch 105 self._kernel_path = qemu_arch_params.kernel_path 106 self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot' 107 self._extra_qemu_params = qemu_arch_params.extra_qemu_params 108 109 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 110 kconfig = kunit_config.parse_from_string(self._kconfig) 111 kconfig.merge_in_entries(base_kunitconfig) 112 return kconfig 113 114 def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]: 115 kernel_path = os.path.join(build_dir, self._kernel_path) 116 qemu_command = ['qemu-system-' + self._qemu_arch, 117 '-nodefaults', 118 '-m', '1024', 119 '-kernel', kernel_path, 120 '-append', ' '.join(params + [self._kernel_command_line]), 121 '-no-reboot', 122 '-nographic', 123 '-serial', 'stdio'] + self._extra_qemu_params 124 # Note: shlex.join() does what we want, but requires python 3.8+. 125 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command)) 126 return subprocess.Popen(qemu_command, 127 stdin=subprocess.PIPE, 128 stdout=subprocess.PIPE, 129 stderr=subprocess.STDOUT, 130 text=True, errors='backslashreplace') 131 132class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): 133 """An abstraction over command line operations performed on a source tree.""" 134 135 def __init__(self, cross_compile=None): 136 super().__init__(linux_arch='um', cross_compile=cross_compile) 137 138 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 139 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH) 140 kconfig.merge_in_entries(base_kunitconfig) 141 return kconfig 142 143 def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]: 144 """Runs the Linux UML binary. Must be named 'linux'.""" 145 linux_bin = os.path.join(build_dir, 'linux') 146 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) 147 return subprocess.Popen([linux_bin] + params, 148 stdin=subprocess.PIPE, 149 stdout=subprocess.PIPE, 150 stderr=subprocess.STDOUT, 151 text=True, errors='backslashreplace') 152 153def get_kconfig_path(build_dir: str) -> str: 154 return os.path.join(build_dir, KCONFIG_PATH) 155 156def get_kunitconfig_path(build_dir: str) -> str: 157 return os.path.join(build_dir, KUNITCONFIG_PATH) 158 159def get_old_kunitconfig_path(build_dir: str) -> str: 160 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH) 161 162def get_parsed_kunitconfig(build_dir: str, 163 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig: 164 if not kunitconfig_paths: 165 path = get_kunitconfig_path(build_dir) 166 if not os.path.exists(path): 167 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path) 168 return kunit_config.parse_file(path) 169 170 merged = kunit_config.Kconfig() 171 172 for path in kunitconfig_paths: 173 if os.path.isdir(path): 174 path = os.path.join(path, KUNITCONFIG_PATH) 175 if not os.path.exists(path): 176 raise ConfigError(f'Specified kunitconfig ({path}) does not exist') 177 178 partial = kunit_config.parse_file(path) 179 diff = merged.conflicting_options(partial) 180 if diff: 181 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff) 182 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}') 183 merged.merge_in_entries(partial) 184 return merged 185 186def get_outfile_path(build_dir: str) -> str: 187 return os.path.join(build_dir, OUTFILE_PATH) 188 189def _default_qemu_config_path(arch: str) -> str: 190 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') 191 if os.path.isfile(config_path): 192 return config_path 193 194 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] 195 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) 196 197def _get_qemu_ops(config_path: str, 198 extra_qemu_args: Optional[List[str]], 199 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]: 200 # The module name/path has very little to do with where the actual file 201 # exists (I learned this through experimentation and could not find it 202 # anywhere in the Python documentation). 203 # 204 # Bascially, we completely ignore the actual file location of the config 205 # we are loading and just tell Python that the module lives in the 206 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually 207 # exists as a file. 208 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) 209 spec = importlib.util.spec_from_file_location(module_path, config_path) 210 assert spec is not None 211 config = importlib.util.module_from_spec(spec) 212 # See https://github.com/python/typeshed/pull/2626 for context. 213 assert isinstance(spec.loader, importlib.abc.Loader) 214 spec.loader.exec_module(config) 215 216 if not hasattr(config, 'QEMU_ARCH'): 217 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) 218 params: qemu_config.QemuArchParams = config.QEMU_ARCH # type: ignore 219 if extra_qemu_args: 220 params.extra_qemu_params.extend(extra_qemu_args) 221 return params.linux_arch, LinuxSourceTreeOperationsQemu( 222 params, cross_compile=cross_compile) 223 224class LinuxSourceTree: 225 """Represents a Linux kernel source tree with KUnit tests.""" 226 227 def __init__( 228 self, 229 build_dir: str, 230 kunitconfig_paths: Optional[List[str]]=None, 231 kconfig_add: Optional[List[str]]=None, 232 arch=None, 233 cross_compile=None, 234 qemu_config_path=None, 235 extra_qemu_args=None) -> None: 236 signal.signal(signal.SIGINT, self.signal_handler) 237 if qemu_config_path: 238 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 239 else: 240 self._arch = 'um' if arch is None else arch 241 if self._arch == 'um': 242 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile) 243 else: 244 qemu_config_path = _default_qemu_config_path(self._arch) 245 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 246 247 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths) 248 if kconfig_add: 249 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) 250 self._kconfig.merge_in_entries(kconfig) 251 252 def arch(self) -> str: 253 return self._arch 254 255 def clean(self) -> bool: 256 try: 257 self._ops.make_mrproper() 258 except ConfigError as e: 259 logging.error(e) 260 return False 261 return True 262 263 def validate_config(self, build_dir: str) -> bool: 264 kconfig_path = get_kconfig_path(build_dir) 265 validated_kconfig = kunit_config.parse_file(kconfig_path) 266 if self._kconfig.is_subset_of(validated_kconfig): 267 return True 268 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries()) 269 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ 270 'This is probably due to unsatisfied dependencies.\n' \ 271 'Missing: ' + ', '.join(str(e) for e in missing) 272 if self._arch == 'um': 273 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ 274 'on a different architecture with something like "--arch=x86_64".' 275 logging.error(message) 276 return False 277 278 def build_config(self, build_dir: str, make_options) -> bool: 279 kconfig_path = get_kconfig_path(build_dir) 280 if build_dir and not os.path.exists(build_dir): 281 os.mkdir(build_dir) 282 try: 283 self._kconfig = self._ops.make_arch_config(self._kconfig) 284 self._kconfig.write_to_file(kconfig_path) 285 self._ops.make_olddefconfig(build_dir, make_options) 286 except ConfigError as e: 287 logging.error(e) 288 return False 289 if not self.validate_config(build_dir): 290 return False 291 292 old_path = get_old_kunitconfig_path(build_dir) 293 if os.path.exists(old_path): 294 os.remove(old_path) # write_to_file appends to the file 295 self._kconfig.write_to_file(old_path) 296 return True 297 298 def _kunitconfig_changed(self, build_dir: str) -> bool: 299 old_path = get_old_kunitconfig_path(build_dir) 300 if not os.path.exists(old_path): 301 return True 302 303 old_kconfig = kunit_config.parse_file(old_path) 304 return old_kconfig != self._kconfig 305 306 def build_reconfig(self, build_dir: str, make_options) -> bool: 307 """Creates a new .config if it is not a subset of the .kunitconfig.""" 308 kconfig_path = get_kconfig_path(build_dir) 309 if not os.path.exists(kconfig_path): 310 print('Generating .config ...') 311 return self.build_config(build_dir, make_options) 312 313 existing_kconfig = kunit_config.parse_file(kconfig_path) 314 self._kconfig = self._ops.make_arch_config(self._kconfig) 315 316 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): 317 return True 318 print('Regenerating .config ...') 319 os.remove(kconfig_path) 320 return self.build_config(build_dir, make_options) 321 322 def build_kernel(self, jobs, build_dir: str, make_options) -> bool: 323 try: 324 self._ops.make_olddefconfig(build_dir, make_options) 325 self._ops.make(jobs, build_dir, make_options) 326 except (ConfigError, BuildError) as e: 327 logging.error(e) 328 return False 329 return self.validate_config(build_dir) 330 331 def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]: 332 if not args: 333 args = [] 334 if filter_glob: 335 args.append('kunit.filter_glob='+filter_glob) 336 args.append('kunit.enable=1') 337 338 process = self._ops.start(args, build_dir) 339 assert process.stdout is not None # tell mypy it's set 340 341 # Enforce the timeout in a background thread. 342 def _wait_proc(): 343 try: 344 process.wait(timeout=timeout) 345 except Exception as e: 346 print(e) 347 process.terminate() 348 process.wait() 349 waiter = threading.Thread(target=_wait_proc) 350 waiter.start() 351 352 output = open(get_outfile_path(build_dir), 'w') 353 try: 354 # Tee the output to the file and to our caller in real time. 355 for line in process.stdout: 356 output.write(line) 357 yield line 358 # This runs even if our caller doesn't consume every line. 359 finally: 360 # Flush any leftover output to the file 361 output.write(process.stdout.read()) 362 output.close() 363 process.stdout.close() 364 365 waiter.join() 366 subprocess.call(['stty', 'sane']) 367 368 def signal_handler(self, unused_sig, unused_frame) -> None: 369 logging.error('Build interruption occurred. Cleaning console.') 370 subprocess.call(['stty', 'sane']) 371