1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <[email protected]>
7# Author: Brendan Higgins <[email protected]>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19
20import kunit_config
21import qemu_config
22
23KCONFIG_PATH = '.config'
24KUNITCONFIG_PATH = '.kunitconfig'
25OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
26DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
27ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
28UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
29OUTFILE_PATH = 'test.log'
30ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
31QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
32
33class ConfigError(Exception):
34	"""Represents an error trying to configure the Linux kernel."""
35
36
37class BuildError(Exception):
38	"""Represents an error trying to build the Linux kernel."""
39
40
41class LinuxSourceTreeOperations:
42	"""An abstraction over command line operations performed on a source tree."""
43
44	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
45		self._linux_arch = linux_arch
46		self._cross_compile = cross_compile
47
48	def make_mrproper(self) -> None:
49		try:
50			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
51		except OSError as e:
52			raise ConfigError('Could not call make command: ' + str(e))
53		except subprocess.CalledProcessError as e:
54			raise ConfigError(e.output.decode())
55
56	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
57		return base_kunitconfig
58
59	def make_olddefconfig(self, build_dir: str, make_options) -> None:
60		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
61		if self._cross_compile:
62			command += ['CROSS_COMPILE=' + self._cross_compile]
63		if make_options:
64			command.extend(make_options)
65		print('Populating config with:\n$', ' '.join(command))
66		try:
67			subprocess.check_output(command, stderr=subprocess.STDOUT)
68		except OSError as e:
69			raise ConfigError('Could not call make command: ' + str(e))
70		except subprocess.CalledProcessError as e:
71			raise ConfigError(e.output.decode())
72
73	def make(self, jobs, build_dir: str, make_options) -> None:
74		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
75		if make_options:
76			command.extend(make_options)
77		if self._cross_compile:
78			command += ['CROSS_COMPILE=' + self._cross_compile]
79		print('Building with:\n$', ' '.join(command))
80		try:
81			proc = subprocess.Popen(command,
82						stderr=subprocess.PIPE,
83						stdout=subprocess.DEVNULL)
84		except OSError as e:
85			raise BuildError('Could not call execute make: ' + str(e))
86		except subprocess.CalledProcessError as e:
87			raise BuildError(e.output)
88		_, stderr = proc.communicate()
89		if proc.returncode != 0:
90			raise BuildError(stderr.decode())
91		if stderr:  # likely only due to build warnings
92			print(stderr.decode())
93
94	def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]:
95		raise RuntimeError('not implemented!')
96
97
98class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
99
100	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
101		super().__init__(linux_arch=qemu_arch_params.linux_arch,
102				 cross_compile=cross_compile)
103		self._kconfig = qemu_arch_params.kconfig
104		self._qemu_arch = qemu_arch_params.qemu_arch
105		self._kernel_path = qemu_arch_params.kernel_path
106		self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
107		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
108
109	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
110		kconfig = kunit_config.parse_from_string(self._kconfig)
111		kconfig.merge_in_entries(base_kunitconfig)
112		return kconfig
113
114	def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]:
115		kernel_path = os.path.join(build_dir, self._kernel_path)
116		qemu_command = ['qemu-system-' + self._qemu_arch,
117				'-nodefaults',
118				'-m', '1024',
119				'-kernel', kernel_path,
120				'-append', ' '.join(params + [self._kernel_command_line]),
121				'-no-reboot',
122				'-nographic',
123				'-serial', 'stdio'] + self._extra_qemu_params
124		# Note: shlex.join() does what we want, but requires python 3.8+.
125		print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
126		return subprocess.Popen(qemu_command,
127					stdin=subprocess.PIPE,
128					stdout=subprocess.PIPE,
129					stderr=subprocess.STDOUT,
130					text=True, errors='backslashreplace')
131
132class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
133	"""An abstraction over command line operations performed on a source tree."""
134
135	def __init__(self, cross_compile=None):
136		super().__init__(linux_arch='um', cross_compile=cross_compile)
137
138	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
139		kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
140		kconfig.merge_in_entries(base_kunitconfig)
141		return kconfig
142
143	def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]:
144		"""Runs the Linux UML binary. Must be named 'linux'."""
145		linux_bin = os.path.join(build_dir, 'linux')
146		params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
147		return subprocess.Popen([linux_bin] + params,
148					   stdin=subprocess.PIPE,
149					   stdout=subprocess.PIPE,
150					   stderr=subprocess.STDOUT,
151					   text=True, errors='backslashreplace')
152
153def get_kconfig_path(build_dir: str) -> str:
154	return os.path.join(build_dir, KCONFIG_PATH)
155
156def get_kunitconfig_path(build_dir: str) -> str:
157	return os.path.join(build_dir, KUNITCONFIG_PATH)
158
159def get_old_kunitconfig_path(build_dir: str) -> str:
160	return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
161
162def get_parsed_kunitconfig(build_dir: str,
163			   kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
164	if not kunitconfig_paths:
165		path = get_kunitconfig_path(build_dir)
166		if not os.path.exists(path):
167			shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
168		return kunit_config.parse_file(path)
169
170	merged = kunit_config.Kconfig()
171
172	for path in kunitconfig_paths:
173		if os.path.isdir(path):
174			path = os.path.join(path, KUNITCONFIG_PATH)
175		if not os.path.exists(path):
176			raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
177
178		partial = kunit_config.parse_file(path)
179		diff = merged.conflicting_options(partial)
180		if diff:
181			diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
182			raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
183		merged.merge_in_entries(partial)
184	return merged
185
186def get_outfile_path(build_dir: str) -> str:
187	return os.path.join(build_dir, OUTFILE_PATH)
188
189def _default_qemu_config_path(arch: str) -> str:
190	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
191	if os.path.isfile(config_path):
192		return config_path
193
194	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
195	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
196
197def _get_qemu_ops(config_path: str,
198		  extra_qemu_args: Optional[List[str]],
199		  cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
200	# The module name/path has very little to do with where the actual file
201	# exists (I learned this through experimentation and could not find it
202	# anywhere in the Python documentation).
203	#
204	# Bascially, we completely ignore the actual file location of the config
205	# we are loading and just tell Python that the module lives in the
206	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
207	# exists as a file.
208	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
209	spec = importlib.util.spec_from_file_location(module_path, config_path)
210	assert spec is not None
211	config = importlib.util.module_from_spec(spec)
212	# See https://github.com/python/typeshed/pull/2626 for context.
213	assert isinstance(spec.loader, importlib.abc.Loader)
214	spec.loader.exec_module(config)
215
216	if not hasattr(config, 'QEMU_ARCH'):
217		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
218	params: qemu_config.QemuArchParams = config.QEMU_ARCH  # type: ignore
219	if extra_qemu_args:
220		params.extra_qemu_params.extend(extra_qemu_args)
221	return params.linux_arch, LinuxSourceTreeOperationsQemu(
222			params, cross_compile=cross_compile)
223
224class LinuxSourceTree:
225	"""Represents a Linux kernel source tree with KUnit tests."""
226
227	def __init__(
228	      self,
229	      build_dir: str,
230	      kunitconfig_paths: Optional[List[str]]=None,
231	      kconfig_add: Optional[List[str]]=None,
232	      arch=None,
233	      cross_compile=None,
234	      qemu_config_path=None,
235	      extra_qemu_args=None) -> None:
236		signal.signal(signal.SIGINT, self.signal_handler)
237		if qemu_config_path:
238			self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
239		else:
240			self._arch = 'um' if arch is None else arch
241			if self._arch == 'um':
242				self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
243			else:
244				qemu_config_path = _default_qemu_config_path(self._arch)
245				_, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
246
247		self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
248		if kconfig_add:
249			kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
250			self._kconfig.merge_in_entries(kconfig)
251
252	def arch(self) -> str:
253		return self._arch
254
255	def clean(self) -> bool:
256		try:
257			self._ops.make_mrproper()
258		except ConfigError as e:
259			logging.error(e)
260			return False
261		return True
262
263	def validate_config(self, build_dir: str) -> bool:
264		kconfig_path = get_kconfig_path(build_dir)
265		validated_kconfig = kunit_config.parse_file(kconfig_path)
266		if self._kconfig.is_subset_of(validated_kconfig):
267			return True
268		missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
269		message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
270			  'This is probably due to unsatisfied dependencies.\n' \
271			  'Missing: ' + ', '.join(str(e) for e in missing)
272		if self._arch == 'um':
273			message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
274				   'on a different architecture with something like "--arch=x86_64".'
275		logging.error(message)
276		return False
277
278	def build_config(self, build_dir: str, make_options) -> bool:
279		kconfig_path = get_kconfig_path(build_dir)
280		if build_dir and not os.path.exists(build_dir):
281			os.mkdir(build_dir)
282		try:
283			self._kconfig = self._ops.make_arch_config(self._kconfig)
284			self._kconfig.write_to_file(kconfig_path)
285			self._ops.make_olddefconfig(build_dir, make_options)
286		except ConfigError as e:
287			logging.error(e)
288			return False
289		if not self.validate_config(build_dir):
290			return False
291
292		old_path = get_old_kunitconfig_path(build_dir)
293		if os.path.exists(old_path):
294			os.remove(old_path)  # write_to_file appends to the file
295		self._kconfig.write_to_file(old_path)
296		return True
297
298	def _kunitconfig_changed(self, build_dir: str) -> bool:
299		old_path = get_old_kunitconfig_path(build_dir)
300		if not os.path.exists(old_path):
301			return True
302
303		old_kconfig = kunit_config.parse_file(old_path)
304		return old_kconfig != self._kconfig
305
306	def build_reconfig(self, build_dir: str, make_options) -> bool:
307		"""Creates a new .config if it is not a subset of the .kunitconfig."""
308		kconfig_path = get_kconfig_path(build_dir)
309		if not os.path.exists(kconfig_path):
310			print('Generating .config ...')
311			return self.build_config(build_dir, make_options)
312
313		existing_kconfig = kunit_config.parse_file(kconfig_path)
314		self._kconfig = self._ops.make_arch_config(self._kconfig)
315
316		if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
317			return True
318		print('Regenerating .config ...')
319		os.remove(kconfig_path)
320		return self.build_config(build_dir, make_options)
321
322	def build_kernel(self, jobs, build_dir: str, make_options) -> bool:
323		try:
324			self._ops.make_olddefconfig(build_dir, make_options)
325			self._ops.make(jobs, build_dir, make_options)
326		except (ConfigError, BuildError) as e:
327			logging.error(e)
328			return False
329		return self.validate_config(build_dir)
330
331	def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]:
332		if not args:
333			args = []
334		if filter_glob:
335			args.append('kunit.filter_glob='+filter_glob)
336		args.append('kunit.enable=1')
337
338		process = self._ops.start(args, build_dir)
339		assert process.stdout is not None  # tell mypy it's set
340
341		# Enforce the timeout in a background thread.
342		def _wait_proc():
343			try:
344				process.wait(timeout=timeout)
345			except Exception as e:
346				print(e)
347				process.terminate()
348				process.wait()
349		waiter = threading.Thread(target=_wait_proc)
350		waiter.start()
351
352		output = open(get_outfile_path(build_dir), 'w')
353		try:
354			# Tee the output to the file and to our caller in real time.
355			for line in process.stdout:
356				output.write(line)
357				yield line
358		# This runs even if our caller doesn't consume every line.
359		finally:
360			# Flush any leftover output to the file
361			output.write(process.stdout.read())
362			output.close()
363			process.stdout.close()
364
365			waiter.join()
366			subprocess.call(['stty', 'sane'])
367
368	def signal_handler(self, unused_sig, unused_frame) -> None:
369		logging.error('Build interruption occurred. Cleaning console.')
370		subprocess.call(['stty', 'sane'])
371