1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <[email protected]>
7# Author: Brendan Higgins <[email protected]>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35	"""Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39	"""Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43	"""An abstraction over command line operations performed on a source tree."""
44
45	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46		self._linux_arch = linux_arch
47		self._cross_compile = cross_compile
48
49	def make_mrproper(self) -> None:
50		try:
51			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52		except OSError as e:
53			raise ConfigError('Could not call make command: ' + str(e))
54		except subprocess.CalledProcessError as e:
55			raise ConfigError(e.output.decode())
56
57	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58		return base_kunitconfig
59
60	def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62		if self._cross_compile:
63			command += ['CROSS_COMPILE=' + self._cross_compile]
64		if make_options:
65			command.extend(make_options)
66		print('Populating config with:\n$', ' '.join(command))
67		try:
68			subprocess.check_output(command, stderr=subprocess.STDOUT)
69		except OSError as e:
70			raise ConfigError('Could not call make command: ' + str(e))
71		except subprocess.CalledProcessError as e:
72			raise ConfigError(e.output.decode())
73
74	def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
76		if make_options:
77			command.extend(make_options)
78		if self._cross_compile:
79			command += ['CROSS_COMPILE=' + self._cross_compile]
80		print('Building with:\n$', ' '.join(command))
81		try:
82			proc = subprocess.Popen(command,
83						stderr=subprocess.PIPE,
84						stdout=subprocess.DEVNULL)
85		except OSError as e:
86			raise BuildError('Could not call execute make: ' + str(e))
87		except subprocess.CalledProcessError as e:
88			raise BuildError(e.output)
89		_, stderr = proc.communicate()
90		if proc.returncode != 0:
91			raise BuildError(stderr.decode())
92		if stderr:  # likely only due to build warnings
93			print(stderr.decode())
94
95	def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]:
96		raise RuntimeError('not implemented!')
97
98
99class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
100
101	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
102		super().__init__(linux_arch=qemu_arch_params.linux_arch,
103				 cross_compile=cross_compile)
104		self._kconfig = qemu_arch_params.kconfig
105		self._qemu_arch = qemu_arch_params.qemu_arch
106		self._kernel_path = qemu_arch_params.kernel_path
107		self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
108		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
109
110	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
111		kconfig = kunit_config.parse_from_string(self._kconfig)
112		kconfig.merge_in_entries(base_kunitconfig)
113		return kconfig
114
115	def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]:
116		kernel_path = os.path.join(build_dir, self._kernel_path)
117		qemu_command = ['qemu-system-' + self._qemu_arch,
118				'-nodefaults',
119				'-m', '1024',
120				'-kernel', kernel_path,
121				'-append', ' '.join(params + [self._kernel_command_line]),
122				'-no-reboot',
123				'-nographic',
124				'-serial', 'stdio'] + self._extra_qemu_params
125		# Note: shlex.join() does what we want, but requires python 3.8+.
126		print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
127		return subprocess.Popen(qemu_command,
128					stdin=subprocess.PIPE,
129					stdout=subprocess.PIPE,
130					stderr=subprocess.STDOUT,
131					text=True, errors='backslashreplace')
132
133class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
134	"""An abstraction over command line operations performed on a source tree."""
135
136	def __init__(self, cross_compile: Optional[str]=None):
137		super().__init__(linux_arch='um', cross_compile=cross_compile)
138
139	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
140		kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
141		kconfig.merge_in_entries(base_kunitconfig)
142		return kconfig
143
144	def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]:
145		"""Runs the Linux UML binary. Must be named 'linux'."""
146		linux_bin = os.path.join(build_dir, 'linux')
147		params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
148		return subprocess.Popen([linux_bin] + params,
149					   stdin=subprocess.PIPE,
150					   stdout=subprocess.PIPE,
151					   stderr=subprocess.STDOUT,
152					   text=True, errors='backslashreplace')
153
154def get_kconfig_path(build_dir: str) -> str:
155	return os.path.join(build_dir, KCONFIG_PATH)
156
157def get_kunitconfig_path(build_dir: str) -> str:
158	return os.path.join(build_dir, KUNITCONFIG_PATH)
159
160def get_old_kunitconfig_path(build_dir: str) -> str:
161	return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
162
163def get_parsed_kunitconfig(build_dir: str,
164			   kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
165	if not kunitconfig_paths:
166		path = get_kunitconfig_path(build_dir)
167		if not os.path.exists(path):
168			shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
169		return kunit_config.parse_file(path)
170
171	merged = kunit_config.Kconfig()
172
173	for path in kunitconfig_paths:
174		if os.path.isdir(path):
175			path = os.path.join(path, KUNITCONFIG_PATH)
176		if not os.path.exists(path):
177			raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
178
179		partial = kunit_config.parse_file(path)
180		diff = merged.conflicting_options(partial)
181		if diff:
182			diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
183			raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
184		merged.merge_in_entries(partial)
185	return merged
186
187def get_outfile_path(build_dir: str) -> str:
188	return os.path.join(build_dir, OUTFILE_PATH)
189
190def _default_qemu_config_path(arch: str) -> str:
191	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
192	if os.path.isfile(config_path):
193		return config_path
194
195	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
196	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
197
198def _get_qemu_ops(config_path: str,
199		  extra_qemu_args: Optional[List[str]],
200		  cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
201	# The module name/path has very little to do with where the actual file
202	# exists (I learned this through experimentation and could not find it
203	# anywhere in the Python documentation).
204	#
205	# Bascially, we completely ignore the actual file location of the config
206	# we are loading and just tell Python that the module lives in the
207	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
208	# exists as a file.
209	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
210	spec = importlib.util.spec_from_file_location(module_path, config_path)
211	assert spec is not None
212	config = importlib.util.module_from_spec(spec)
213	# See https://github.com/python/typeshed/pull/2626 for context.
214	assert isinstance(spec.loader, importlib.abc.Loader)
215	spec.loader.exec_module(config)
216
217	if not hasattr(config, 'QEMU_ARCH'):
218		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
219	params: qemu_config.QemuArchParams = config.QEMU_ARCH
220	if extra_qemu_args:
221		params.extra_qemu_params.extend(extra_qemu_args)
222	return params.linux_arch, LinuxSourceTreeOperationsQemu(
223			params, cross_compile=cross_compile)
224
225class LinuxSourceTree:
226	"""Represents a Linux kernel source tree with KUnit tests."""
227
228	def __init__(
229	      self,
230	      build_dir: str,
231	      kunitconfig_paths: Optional[List[str]]=None,
232	      kconfig_add: Optional[List[str]]=None,
233	      arch: Optional[str]=None,
234	      cross_compile: Optional[str]=None,
235	      qemu_config_path: Optional[str]=None,
236	      extra_qemu_args: Optional[List[str]]=None) -> None:
237		signal.signal(signal.SIGINT, self.signal_handler)
238		if qemu_config_path:
239			self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
240		else:
241			self._arch = 'um' if arch is None else arch
242			if self._arch == 'um':
243				self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
244			else:
245				qemu_config_path = _default_qemu_config_path(self._arch)
246				_, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
247
248		self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
249		if kconfig_add:
250			kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
251			self._kconfig.merge_in_entries(kconfig)
252
253	def arch(self) -> str:
254		return self._arch
255
256	def clean(self) -> bool:
257		try:
258			self._ops.make_mrproper()
259		except ConfigError as e:
260			logging.error(e)
261			return False
262		return True
263
264	def validate_config(self, build_dir: str) -> bool:
265		kconfig_path = get_kconfig_path(build_dir)
266		validated_kconfig = kunit_config.parse_file(kconfig_path)
267		if self._kconfig.is_subset_of(validated_kconfig):
268			return True
269		missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
270		message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
271			  'This is probably due to unsatisfied dependencies.\n' \
272			  'Missing: ' + ', '.join(str(e) for e in missing)
273		if self._arch == 'um':
274			message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
275				   'on a different architecture with something like "--arch=x86_64".'
276		logging.error(message)
277		return False
278
279	def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
280		kconfig_path = get_kconfig_path(build_dir)
281		if build_dir and not os.path.exists(build_dir):
282			os.mkdir(build_dir)
283		try:
284			self._kconfig = self._ops.make_arch_config(self._kconfig)
285			self._kconfig.write_to_file(kconfig_path)
286			self._ops.make_olddefconfig(build_dir, make_options)
287		except ConfigError as e:
288			logging.error(e)
289			return False
290		if not self.validate_config(build_dir):
291			return False
292
293		old_path = get_old_kunitconfig_path(build_dir)
294		if os.path.exists(old_path):
295			os.remove(old_path)  # write_to_file appends to the file
296		self._kconfig.write_to_file(old_path)
297		return True
298
299	def _kunitconfig_changed(self, build_dir: str) -> bool:
300		old_path = get_old_kunitconfig_path(build_dir)
301		if not os.path.exists(old_path):
302			return True
303
304		old_kconfig = kunit_config.parse_file(old_path)
305		return old_kconfig != self._kconfig
306
307	def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
308		"""Creates a new .config if it is not a subset of the .kunitconfig."""
309		kconfig_path = get_kconfig_path(build_dir)
310		if not os.path.exists(kconfig_path):
311			print('Generating .config ...')
312			return self.build_config(build_dir, make_options)
313
314		existing_kconfig = kunit_config.parse_file(kconfig_path)
315		self._kconfig = self._ops.make_arch_config(self._kconfig)
316
317		if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
318			return True
319		print('Regenerating .config ...')
320		os.remove(kconfig_path)
321		return self.build_config(build_dir, make_options)
322
323	def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
324		try:
325			self._ops.make_olddefconfig(build_dir, make_options)
326			self._ops.make(jobs, build_dir, make_options)
327		except (ConfigError, BuildError) as e:
328			logging.error(e)
329			return False
330		return self.validate_config(build_dir)
331
332	def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', timeout: Optional[int]=None) -> Iterator[str]:
333		if not args:
334			args = []
335		if filter_glob:
336			args.append('kunit.filter_glob='+filter_glob)
337		args.append('kunit.enable=1')
338
339		process = self._ops.start(args, build_dir)
340		assert process.stdout is not None  # tell mypy it's set
341
342		# Enforce the timeout in a background thread.
343		def _wait_proc() -> None:
344			try:
345				process.wait(timeout=timeout)
346			except Exception as e:
347				print(e)
348				process.terminate()
349				process.wait()
350		waiter = threading.Thread(target=_wait_proc)
351		waiter.start()
352
353		output = open(get_outfile_path(build_dir), 'w')
354		try:
355			# Tee the output to the file and to our caller in real time.
356			for line in process.stdout:
357				output.write(line)
358				yield line
359		# This runs even if our caller doesn't consume every line.
360		finally:
361			# Flush any leftover output to the file
362			output.write(process.stdout.read())
363			output.close()
364			process.stdout.close()
365
366			waiter.join()
367			subprocess.call(['stty', 'sane'])
368
369	def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
370		logging.error('Build interruption occurred. Cleaning console.')
371		subprocess.call(['stty', 'sane'])
372