1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <[email protected]>
9# Author: Brendan Higgins <[email protected]>
10# Author: Rae Moar <[email protected]>
11
12from __future__ import annotations
13import re
14import sys
15
16import datetime
17from enum import Enum, auto
18from functools import reduce
19from typing import Iterable, Iterator, List, Optional, Tuple
20
21class Test(object):
22	"""
23	A class to represent a test parsed from KTAP results. All KTAP
24	results within a test log are stored in a main Test object as
25	subtests.
26
27	Attributes:
28	status : TestStatus - status of the test
29	name : str - name of the test
30	expected_count : int - expected number of subtests (0 if single
31		test case and None if unknown expected number of subtests)
32	subtests : List[Test] - list of subtests
33	log : List[str] - log of KTAP lines that correspond to the test
34	counts : TestCounts - counts of the test statuses and errors of
35		subtests or of the test itself if the test is a single
36		test case.
37	"""
38	def __init__(self) -> None:
39		"""Creates Test object with default attributes."""
40		self.status = TestStatus.TEST_CRASHED
41		self.name = ''
42		self.expected_count = 0  # type: Optional[int]
43		self.subtests = []  # type: List[Test]
44		self.log = []  # type: List[str]
45		self.counts = TestCounts()
46
47	def __str__(self) -> str:
48		"""Returns string representation of a Test class object."""
49		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
50			f'{self.subtests}, {self.log}, {self.counts})')
51
52	def __repr__(self) -> str:
53		"""Returns string representation of a Test class object."""
54		return str(self)
55
56	def add_error(self, error_message: str) -> None:
57		"""Records an error that occurred while parsing this test."""
58		self.counts.errors += 1
59		print_with_timestamp(red('[ERROR]') + f' Test: {self.name}: {error_message}')
60
61class TestStatus(Enum):
62	"""An enumeration class to represent the status of a test."""
63	SUCCESS = auto()
64	FAILURE = auto()
65	SKIPPED = auto()
66	TEST_CRASHED = auto()
67	NO_TESTS = auto()
68	FAILURE_TO_PARSE_TESTS = auto()
69
70class TestCounts:
71	"""
72	Tracks the counts of statuses of all test cases and any errors within
73	a Test.
74
75	Attributes:
76	passed : int - the number of tests that have passed
77	failed : int - the number of tests that have failed
78	crashed : int - the number of tests that have crashed
79	skipped : int - the number of tests that have skipped
80	errors : int - the number of errors in the test and subtests
81	"""
82	def __init__(self):
83		"""Creates TestCounts object with counts of all test
84		statuses and test errors set to 0.
85		"""
86		self.passed = 0
87		self.failed = 0
88		self.crashed = 0
89		self.skipped = 0
90		self.errors = 0
91
92	def __str__(self) -> str:
93		"""Returns the string representation of a TestCounts object."""
94		statuses = [('passed', self.passed), ('failed', self.failed),
95			('crashed', self.crashed), ('skipped', self.skipped),
96			('errors', self.errors)]
97		return f'Ran {self.total()} tests: ' + \
98			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
99
100	def total(self) -> int:
101		"""Returns the total number of test cases within a test
102		object, where a test case is a test with no subtests.
103		"""
104		return (self.passed + self.failed + self.crashed +
105			self.skipped)
106
107	def add_subtest_counts(self, counts: TestCounts) -> None:
108		"""
109		Adds the counts of another TestCounts object to the current
110		TestCounts object. Used to add the counts of a subtest to the
111		parent test.
112
113		Parameters:
114		counts - a different TestCounts object whose counts
115			will be added to the counts of the TestCounts object
116		"""
117		self.passed += counts.passed
118		self.failed += counts.failed
119		self.crashed += counts.crashed
120		self.skipped += counts.skipped
121		self.errors += counts.errors
122
123	def get_status(self) -> TestStatus:
124		"""Returns the aggregated status of a Test using test
125		counts.
126		"""
127		if self.total() == 0:
128			return TestStatus.NO_TESTS
129		elif self.crashed:
130			# Crashes should take priority.
131			return TestStatus.TEST_CRASHED
132		elif self.failed:
133			return TestStatus.FAILURE
134		elif self.passed:
135			# No failures or crashes, looks good!
136			return TestStatus.SUCCESS
137		else:
138			# We have only skipped tests.
139			return TestStatus.SKIPPED
140
141	def add_status(self, status: TestStatus) -> None:
142		"""Increments the count for `status`."""
143		if status == TestStatus.SUCCESS:
144			self.passed += 1
145		elif status == TestStatus.FAILURE:
146			self.failed += 1
147		elif status == TestStatus.SKIPPED:
148			self.skipped += 1
149		elif status != TestStatus.NO_TESTS:
150			self.crashed += 1
151
152class LineStream:
153	"""
154	A class to represent the lines of kernel output.
155	Provides a lazy peek()/pop() interface over an iterator of
156	(line#, text).
157	"""
158	_lines: Iterator[Tuple[int, str]]
159	_next: Tuple[int, str]
160	_need_next: bool
161	_done: bool
162
163	def __init__(self, lines: Iterator[Tuple[int, str]]):
164		"""Creates a new LineStream that wraps the given iterator."""
165		self._lines = lines
166		self._done = False
167		self._need_next = True
168		self._next = (0, '')
169
170	def _get_next(self) -> None:
171		"""Advances the LineSteam to the next line, if necessary."""
172		if not self._need_next:
173			return
174		try:
175			self._next = next(self._lines)
176		except StopIteration:
177			self._done = True
178		finally:
179			self._need_next = False
180
181	def peek(self) -> str:
182		"""Returns the current line, without advancing the LineStream.
183		"""
184		self._get_next()
185		return self._next[1]
186
187	def pop(self) -> str:
188		"""Returns the current line and advances the LineStream to
189		the next line.
190		"""
191		s = self.peek()
192		if self._done:
193			raise ValueError(f'LineStream: going past EOF, last line was {s}')
194		self._need_next = True
195		return s
196
197	def __bool__(self) -> bool:
198		"""Returns True if stream has more lines."""
199		self._get_next()
200		return not self._done
201
202	# Only used by kunit_tool_test.py.
203	def __iter__(self) -> Iterator[str]:
204		"""Empties all lines stored in LineStream object into
205		Iterator object and returns the Iterator object.
206		"""
207		while bool(self):
208			yield self.pop()
209
210	def line_number(self) -> int:
211		"""Returns the line number of the current line."""
212		self._get_next()
213		return self._next[0]
214
215# Parsing helper methods:
216
217KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
218TAP_START = re.compile(r'TAP version ([0-9]+)$')
219KTAP_END = re.compile('(List of all partitions:|'
220	'Kernel panic - not syncing: VFS:|reboot: System halted)')
221
222def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
223	"""Extracts KTAP lines from the kernel output."""
224	def isolate_ktap_output(kernel_output: Iterable[str]) \
225			-> Iterator[Tuple[int, str]]:
226		line_num = 0
227		started = False
228		for line in kernel_output:
229			line_num += 1
230			line = line.rstrip()  # remove trailing \n
231			if not started and KTAP_START.search(line):
232				# start extracting KTAP lines and set prefix
233				# to number of characters before version line
234				prefix_len = len(
235					line.split('KTAP version')[0])
236				started = True
237				yield line_num, line[prefix_len:]
238			elif not started and TAP_START.search(line):
239				# start extracting KTAP lines and set prefix
240				# to number of characters before version line
241				prefix_len = len(line.split('TAP version')[0])
242				started = True
243				yield line_num, line[prefix_len:]
244			elif started and KTAP_END.search(line):
245				# stop extracting KTAP lines
246				break
247			elif started:
248				# remove prefix and any indention and yield
249				# line with line number
250				line = line[prefix_len:].lstrip()
251				yield line_num, line
252	return LineStream(lines=isolate_ktap_output(kernel_output))
253
254KTAP_VERSIONS = [1]
255TAP_VERSIONS = [13, 14]
256
257def check_version(version_num: int, accepted_versions: List[int],
258			version_type: str, test: Test) -> None:
259	"""
260	Adds error to test object if version number is too high or too
261	low.
262
263	Parameters:
264	version_num - The inputted version number from the parsed KTAP or TAP
265		header line
266	accepted_version - List of accepted KTAP or TAP versions
267	version_type - 'KTAP' or 'TAP' depending on the type of
268		version line.
269	test - Test object for current test being parsed
270	"""
271	if version_num < min(accepted_versions):
272		test.add_error(f'{version_type} version lower than expected!')
273	elif version_num > max(accepted_versions):
274		test.add_error(f'{version_type} version higer than expected!')
275
276def parse_ktap_header(lines: LineStream, test: Test) -> bool:
277	"""
278	Parses KTAP/TAP header line and checks version number.
279	Returns False if fails to parse KTAP/TAP header line.
280
281	Accepted formats:
282	- 'KTAP version [version number]'
283	- 'TAP version [version number]'
284
285	Parameters:
286	lines - LineStream of KTAP output to parse
287	test - Test object for current test being parsed
288
289	Return:
290	True if successfully parsed KTAP/TAP header line
291	"""
292	ktap_match = KTAP_START.match(lines.peek())
293	tap_match = TAP_START.match(lines.peek())
294	if ktap_match:
295		version_num = int(ktap_match.group(1))
296		check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
297	elif tap_match:
298		version_num = int(tap_match.group(1))
299		check_version(version_num, TAP_VERSIONS, 'TAP', test)
300	else:
301		return False
302	test.log.append(lines.pop())
303	return True
304
305TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
306
307def parse_test_header(lines: LineStream, test: Test) -> bool:
308	"""
309	Parses test header and stores test name in test object.
310	Returns False if fails to parse test header line.
311
312	Accepted format:
313	- '# Subtest: [test name]'
314
315	Parameters:
316	lines - LineStream of KTAP output to parse
317	test - Test object for current test being parsed
318
319	Return:
320	True if successfully parsed test header line
321	"""
322	match = TEST_HEADER.match(lines.peek())
323	if not match:
324		return False
325	test.log.append(lines.pop())
326	test.name = match.group(1)
327	return True
328
329TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
330
331def parse_test_plan(lines: LineStream, test: Test) -> bool:
332	"""
333	Parses test plan line and stores the expected number of subtests in
334	test object. Reports an error if expected count is 0.
335	Returns False and sets expected_count to None if there is no valid test
336	plan.
337
338	Accepted format:
339	- '1..[number of subtests]'
340
341	Parameters:
342	lines - LineStream of KTAP output to parse
343	test - Test object for current test being parsed
344
345	Return:
346	True if successfully parsed test plan line
347	"""
348	match = TEST_PLAN.match(lines.peek())
349	if not match:
350		test.expected_count = None
351		return False
352	test.log.append(lines.pop())
353	expected_count = int(match.group(1))
354	test.expected_count = expected_count
355	return True
356
357TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
358
359TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
360
361def peek_test_name_match(lines: LineStream, test: Test) -> bool:
362	"""
363	Matches current line with the format of a test result line and checks
364	if the name matches the name of the current test.
365	Returns False if fails to match format or name.
366
367	Accepted format:
368	- '[ok|not ok] [test number] [-] [test name] [optional skip
369		directive]'
370
371	Parameters:
372	lines - LineStream of KTAP output to parse
373	test - Test object for current test being parsed
374
375	Return:
376	True if matched a test result line and the name matching the
377		expected test name
378	"""
379	line = lines.peek()
380	match = TEST_RESULT.match(line)
381	if not match:
382		return False
383	name = match.group(4)
384	return (name == test.name)
385
386def parse_test_result(lines: LineStream, test: Test,
387			expected_num: int) -> bool:
388	"""
389	Parses test result line and stores the status and name in the test
390	object. Reports an error if the test number does not match expected
391	test number.
392	Returns False if fails to parse test result line.
393
394	Note that the SKIP directive is the only direction that causes a
395	change in status.
396
397	Accepted format:
398	- '[ok|not ok] [test number] [-] [test name] [optional skip
399		directive]'
400
401	Parameters:
402	lines - LineStream of KTAP output to parse
403	test - Test object for current test being parsed
404	expected_num - expected test number for current test
405
406	Return:
407	True if successfully parsed a test result line.
408	"""
409	line = lines.peek()
410	match = TEST_RESULT.match(line)
411	skip_match = TEST_RESULT_SKIP.match(line)
412
413	# Check if line matches test result line format
414	if not match:
415		return False
416	test.log.append(lines.pop())
417
418	# Set name of test object
419	if skip_match:
420		test.name = skip_match.group(4)
421	else:
422		test.name = match.group(4)
423
424	# Check test num
425	num = int(match.group(2))
426	if num != expected_num:
427		test.add_error(f'Expected test number {expected_num} but found {num}')
428
429	# Set status of test object
430	status = match.group(1)
431	if skip_match:
432		test.status = TestStatus.SKIPPED
433	elif status == 'ok':
434		test.status = TestStatus.SUCCESS
435	else:
436		test.status = TestStatus.FAILURE
437	return True
438
439def parse_diagnostic(lines: LineStream) -> List[str]:
440	"""
441	Parse lines that do not match the format of a test result line or
442	test header line and returns them in list.
443
444	Line formats that are not parsed:
445	- '# Subtest: [test name]'
446	- '[ok|not ok] [test number] [-] [test name] [optional skip
447		directive]'
448
449	Parameters:
450	lines - LineStream of KTAP output to parse
451
452	Return:
453	Log of diagnostic lines
454	"""
455	log = []  # type: List[str]
456	while lines and not TEST_RESULT.match(lines.peek()) and not \
457			TEST_HEADER.match(lines.peek()):
458		log.append(lines.pop())
459	return log
460
461
462# Printing helper methods:
463
464DIVIDER = '=' * 60
465
466RESET = '\033[0;0m'
467
468def red(text: str) -> str:
469	"""Returns inputted string with red color code."""
470	if not sys.stdout.isatty():
471		return text
472	return '\033[1;31m' + text + RESET
473
474def yellow(text: str) -> str:
475	"""Returns inputted string with yellow color code."""
476	if not sys.stdout.isatty():
477		return text
478	return '\033[1;33m' + text + RESET
479
480def green(text: str) -> str:
481	"""Returns inputted string with green color code."""
482	if not sys.stdout.isatty():
483		return text
484	return '\033[1;32m' + text + RESET
485
486ANSI_LEN = len(red(''))
487
488def print_with_timestamp(message: str) -> None:
489	"""Prints message with timestamp at beginning."""
490	print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message))
491
492def format_test_divider(message: str, len_message: int) -> str:
493	"""
494	Returns string with message centered in fixed width divider.
495
496	Example:
497	'===================== message example ====================='
498
499	Parameters:
500	message - message to be centered in divider line
501	len_message - length of the message to be printed such that
502		any characters of the color codes are not counted
503
504	Return:
505	String containing message centered in fixed width divider
506	"""
507	default_count = 3  # default number of dashes
508	len_1 = default_count
509	len_2 = default_count
510	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
511	if difference > 0:
512		# calculate number of dashes for each side of the divider
513		len_1 = int(difference / 2)
514		len_2 = difference - len_1
515	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
516
517def print_test_header(test: Test) -> None:
518	"""
519	Prints test header with test name and optionally the expected number
520	of subtests.
521
522	Example:
523	'=================== example (2 subtests) ==================='
524
525	Parameters:
526	test - Test object representing current test being printed
527	"""
528	message = test.name
529	if test.expected_count:
530		if test.expected_count == 1:
531			message += ' (1 subtest)'
532		else:
533			message += f' ({test.expected_count} subtests)'
534	print_with_timestamp(format_test_divider(message, len(message)))
535
536def print_log(log: Iterable[str]) -> None:
537	"""Prints all strings in saved log for test in yellow."""
538	for m in log:
539		print_with_timestamp(yellow(m))
540
541def format_test_result(test: Test) -> str:
542	"""
543	Returns string with formatted test result with colored status and test
544	name.
545
546	Example:
547	'[PASSED] example'
548
549	Parameters:
550	test - Test object representing current test being printed
551
552	Return:
553	String containing formatted test result
554	"""
555	if test.status == TestStatus.SUCCESS:
556		return (green('[PASSED] ') + test.name)
557	elif test.status == TestStatus.SKIPPED:
558		return (yellow('[SKIPPED] ') + test.name)
559	elif test.status == TestStatus.NO_TESTS:
560		return (yellow('[NO TESTS RUN] ') + test.name)
561	elif test.status == TestStatus.TEST_CRASHED:
562		print_log(test.log)
563		return (red('[CRASHED] ') + test.name)
564	else:
565		print_log(test.log)
566		return (red('[FAILED] ') + test.name)
567
568def print_test_result(test: Test) -> None:
569	"""
570	Prints result line with status of test.
571
572	Example:
573	'[PASSED] example'
574
575	Parameters:
576	test - Test object representing current test being printed
577	"""
578	print_with_timestamp(format_test_result(test))
579
580def print_test_footer(test: Test) -> None:
581	"""
582	Prints test footer with status of test.
583
584	Example:
585	'===================== [PASSED] example ====================='
586
587	Parameters:
588	test - Test object representing current test being printed
589	"""
590	message = format_test_result(test)
591	print_with_timestamp(format_test_divider(message,
592		len(message) - ANSI_LEN))
593
594def print_summary_line(test: Test) -> None:
595	"""
596	Prints summary line of test object. Color of line is dependent on
597	status of test. Color is green if test passes, yellow if test is
598	skipped, and red if the test fails or crashes. Summary line contains
599	counts of the statuses of the tests subtests or the test itself if it
600	has no subtests.
601
602	Example:
603	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
604	Errors: 0"
605
606	test - Test object representing current test being printed
607	"""
608	if test.status == TestStatus.SUCCESS:
609		color = green
610	elif test.status == TestStatus.SKIPPED or test.status == TestStatus.NO_TESTS:
611		color = yellow
612	else:
613		color = red
614	print_with_timestamp(color(f'Testing complete. {test.counts}'))
615
616# Other methods:
617
618def bubble_up_test_results(test: Test) -> None:
619	"""
620	If the test has subtests, add the test counts of the subtests to the
621	test and check if any of the tests crashed and if so set the test
622	status to crashed. Otherwise if the test has no subtests add the
623	status of the test to the test counts.
624
625	Parameters:
626	test - Test object for current test being parsed
627	"""
628	subtests = test.subtests
629	counts = test.counts
630	status = test.status
631	for t in subtests:
632		counts.add_subtest_counts(t.counts)
633	if counts.total() == 0:
634		counts.add_status(status)
635	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
636		test.status = TestStatus.TEST_CRASHED
637
638def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
639	"""
640	Finds next test to parse in LineStream, creates new Test object,
641	parses any subtests of the test, populates Test object with all
642	information (status, name) about the test and the Test objects for
643	any subtests, and then returns the Test object. The method accepts
644	three formats of tests:
645
646	Accepted test formats:
647
648	- Main KTAP/TAP header
649
650	Example:
651
652	KTAP version 1
653	1..4
654	[subtests]
655
656	- Subtest header line
657
658	Example:
659
660	# Subtest: name
661	1..3
662	[subtests]
663	ok 1 name
664
665	- Test result line
666
667	Example:
668
669	ok 1 - test
670
671	Parameters:
672	lines - LineStream of KTAP output to parse
673	expected_num - expected test number for test to be parsed
674	log - list of strings containing any preceding diagnostic lines
675		corresponding to the current test
676
677	Return:
678	Test object populated with characteristics and any subtests
679	"""
680	test = Test()
681	test.log.extend(log)
682	parent_test = False
683	main = parse_ktap_header(lines, test)
684	if main:
685		# If KTAP/TAP header is found, attempt to parse
686		# test plan
687		test.name = "main"
688		parse_test_plan(lines, test)
689		parent_test = True
690	else:
691		# If KTAP/TAP header is not found, test must be subtest
692		# header or test result line so parse attempt to parser
693		# subtest header
694		parent_test = parse_test_header(lines, test)
695		if parent_test:
696			# If subtest header is found, attempt to parse
697			# test plan and print header
698			parse_test_plan(lines, test)
699			print_test_header(test)
700	expected_count = test.expected_count
701	subtests = []
702	test_num = 1
703	while parent_test and (expected_count is None or test_num <= expected_count):
704		# Loop to parse any subtests.
705		# Break after parsing expected number of tests or
706		# if expected number of tests is unknown break when test
707		# result line with matching name to subtest header is found
708		# or no more lines in stream.
709		sub_log = parse_diagnostic(lines)
710		sub_test = Test()
711		if not lines or (peek_test_name_match(lines, test) and
712				not main):
713			if expected_count and test_num <= expected_count:
714				# If parser reaches end of test before
715				# parsing expected number of subtests, print
716				# crashed subtest and record error
717				test.add_error('missing expected subtest!')
718				sub_test.log.extend(sub_log)
719				test.counts.add_status(
720					TestStatus.TEST_CRASHED)
721				print_test_result(sub_test)
722			else:
723				test.log.extend(sub_log)
724				break
725		else:
726			sub_test = parse_test(lines, test_num, sub_log)
727		subtests.append(sub_test)
728		test_num += 1
729	test.subtests = subtests
730	if not main:
731		# If not main test, look for test result line
732		test.log.extend(parse_diagnostic(lines))
733		if (parent_test and peek_test_name_match(lines, test)) or \
734				not parent_test:
735			parse_test_result(lines, test, expected_num)
736		else:
737			test.add_error('missing subtest result line!')
738
739	# Check for there being no tests
740	if parent_test and len(subtests) == 0:
741		# Don't override a bad status if this test had one reported.
742		# Assumption: no subtests means CRASHED is from Test.__init__()
743		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
744			test.status = TestStatus.NO_TESTS
745			test.add_error('0 tests run!')
746
747	# Add statuses to TestCounts attribute in Test object
748	bubble_up_test_results(test)
749	if parent_test and not main:
750		# If test has subtests and is not the main test object, print
751		# footer.
752		print_test_footer(test)
753	elif not main:
754		print_test_result(test)
755	return test
756
757def parse_run_tests(kernel_output: Iterable[str]) -> Test:
758	"""
759	Using kernel output, extract KTAP lines, parse the lines for test
760	results and print condensed test results and summary line.
761
762	Parameters:
763	kernel_output - Iterable object contains lines of kernel output
764
765	Return:
766	Test - the main test object with all subtests.
767	"""
768	print_with_timestamp(DIVIDER)
769	lines = extract_tap_lines(kernel_output)
770	test = Test()
771	if not lines:
772		test.name = '<missing>'
773		test.add_error('could not find any KTAP output!')
774		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
775	else:
776		test = parse_test(lines, 0, [])
777		if test.status != TestStatus.NO_TESTS:
778			test.status = test.counts.get_status()
779	print_with_timestamp(DIVIDER)
780	print_summary_line(test)
781	return test
782