1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses KTAP test results from a kernel dmesg log and incrementally prints 4# results with reader-friendly format. Stores and returns test results in a 5# Test object. 6# 7# Copyright (C) 2019, Google LLC. 8# Author: Felix Guo <[email protected]> 9# Author: Brendan Higgins <[email protected]> 10# Author: Rae Moar <[email protected]> 11 12from __future__ import annotations 13import re 14import sys 15 16import datetime 17from enum import Enum, auto 18from functools import reduce 19from typing import Iterable, Iterator, List, Optional, Tuple 20 21class Test(object): 22 """ 23 A class to represent a test parsed from KTAP results. All KTAP 24 results within a test log are stored in a main Test object as 25 subtests. 26 27 Attributes: 28 status : TestStatus - status of the test 29 name : str - name of the test 30 expected_count : int - expected number of subtests (0 if single 31 test case and None if unknown expected number of subtests) 32 subtests : List[Test] - list of subtests 33 log : List[str] - log of KTAP lines that correspond to the test 34 counts : TestCounts - counts of the test statuses and errors of 35 subtests or of the test itself if the test is a single 36 test case. 37 """ 38 def __init__(self) -> None: 39 """Creates Test object with default attributes.""" 40 self.status = TestStatus.TEST_CRASHED 41 self.name = '' 42 self.expected_count = 0 # type: Optional[int] 43 self.subtests = [] # type: List[Test] 44 self.log = [] # type: List[str] 45 self.counts = TestCounts() 46 47 def __str__(self) -> str: 48 """Returns string representation of a Test class object.""" 49 return (f'Test({self.status}, {self.name}, {self.expected_count}, ' 50 f'{self.subtests}, {self.log}, {self.counts})') 51 52 def __repr__(self) -> str: 53 """Returns string representation of a Test class object.""" 54 return str(self) 55 56 def add_error(self, error_message: str) -> None: 57 """Records an error that occurred while parsing this test.""" 58 self.counts.errors += 1 59 print_with_timestamp(red('[ERROR]') + f' Test: {self.name}: {error_message}') 60 61class TestStatus(Enum): 62 """An enumeration class to represent the status of a test.""" 63 SUCCESS = auto() 64 FAILURE = auto() 65 SKIPPED = auto() 66 TEST_CRASHED = auto() 67 NO_TESTS = auto() 68 FAILURE_TO_PARSE_TESTS = auto() 69 70class TestCounts: 71 """ 72 Tracks the counts of statuses of all test cases and any errors within 73 a Test. 74 75 Attributes: 76 passed : int - the number of tests that have passed 77 failed : int - the number of tests that have failed 78 crashed : int - the number of tests that have crashed 79 skipped : int - the number of tests that have skipped 80 errors : int - the number of errors in the test and subtests 81 """ 82 def __init__(self): 83 """Creates TestCounts object with counts of all test 84 statuses and test errors set to 0. 85 """ 86 self.passed = 0 87 self.failed = 0 88 self.crashed = 0 89 self.skipped = 0 90 self.errors = 0 91 92 def __str__(self) -> str: 93 """Returns the string representation of a TestCounts object.""" 94 statuses = [('passed', self.passed), ('failed', self.failed), 95 ('crashed', self.crashed), ('skipped', self.skipped), 96 ('errors', self.errors)] 97 return f'Ran {self.total()} tests: ' + \ 98 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) 99 100 def total(self) -> int: 101 """Returns the total number of test cases within a test 102 object, where a test case is a test with no subtests. 103 """ 104 return (self.passed + self.failed + self.crashed + 105 self.skipped) 106 107 def add_subtest_counts(self, counts: TestCounts) -> None: 108 """ 109 Adds the counts of another TestCounts object to the current 110 TestCounts object. Used to add the counts of a subtest to the 111 parent test. 112 113 Parameters: 114 counts - a different TestCounts object whose counts 115 will be added to the counts of the TestCounts object 116 """ 117 self.passed += counts.passed 118 self.failed += counts.failed 119 self.crashed += counts.crashed 120 self.skipped += counts.skipped 121 self.errors += counts.errors 122 123 def get_status(self) -> TestStatus: 124 """Returns the aggregated status of a Test using test 125 counts. 126 """ 127 if self.total() == 0: 128 return TestStatus.NO_TESTS 129 elif self.crashed: 130 # Crashes should take priority. 131 return TestStatus.TEST_CRASHED 132 elif self.failed: 133 return TestStatus.FAILURE 134 elif self.passed: 135 # No failures or crashes, looks good! 136 return TestStatus.SUCCESS 137 else: 138 # We have only skipped tests. 139 return TestStatus.SKIPPED 140 141 def add_status(self, status: TestStatus) -> None: 142 """Increments the count for `status`.""" 143 if status == TestStatus.SUCCESS: 144 self.passed += 1 145 elif status == TestStatus.FAILURE: 146 self.failed += 1 147 elif status == TestStatus.SKIPPED: 148 self.skipped += 1 149 elif status != TestStatus.NO_TESTS: 150 self.crashed += 1 151 152class LineStream: 153 """ 154 A class to represent the lines of kernel output. 155 Provides a lazy peek()/pop() interface over an iterator of 156 (line#, text). 157 """ 158 _lines: Iterator[Tuple[int, str]] 159 _next: Tuple[int, str] 160 _need_next: bool 161 _done: bool 162 163 def __init__(self, lines: Iterator[Tuple[int, str]]): 164 """Creates a new LineStream that wraps the given iterator.""" 165 self._lines = lines 166 self._done = False 167 self._need_next = True 168 self._next = (0, '') 169 170 def _get_next(self) -> None: 171 """Advances the LineSteam to the next line, if necessary.""" 172 if not self._need_next: 173 return 174 try: 175 self._next = next(self._lines) 176 except StopIteration: 177 self._done = True 178 finally: 179 self._need_next = False 180 181 def peek(self) -> str: 182 """Returns the current line, without advancing the LineStream. 183 """ 184 self._get_next() 185 return self._next[1] 186 187 def pop(self) -> str: 188 """Returns the current line and advances the LineStream to 189 the next line. 190 """ 191 s = self.peek() 192 if self._done: 193 raise ValueError(f'LineStream: going past EOF, last line was {s}') 194 self._need_next = True 195 return s 196 197 def __bool__(self) -> bool: 198 """Returns True if stream has more lines.""" 199 self._get_next() 200 return not self._done 201 202 # Only used by kunit_tool_test.py. 203 def __iter__(self) -> Iterator[str]: 204 """Empties all lines stored in LineStream object into 205 Iterator object and returns the Iterator object. 206 """ 207 while bool(self): 208 yield self.pop() 209 210 def line_number(self) -> int: 211 """Returns the line number of the current line.""" 212 self._get_next() 213 return self._next[0] 214 215# Parsing helper methods: 216 217KTAP_START = re.compile(r'KTAP version ([0-9]+)$') 218TAP_START = re.compile(r'TAP version ([0-9]+)$') 219KTAP_END = re.compile('(List of all partitions:|' 220 'Kernel panic - not syncing: VFS:|reboot: System halted)') 221 222def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: 223 """Extracts KTAP lines from the kernel output.""" 224 def isolate_ktap_output(kernel_output: Iterable[str]) \ 225 -> Iterator[Tuple[int, str]]: 226 line_num = 0 227 started = False 228 for line in kernel_output: 229 line_num += 1 230 line = line.rstrip() # remove trailing \n 231 if not started and KTAP_START.search(line): 232 # start extracting KTAP lines and set prefix 233 # to number of characters before version line 234 prefix_len = len( 235 line.split('KTAP version')[0]) 236 started = True 237 yield line_num, line[prefix_len:] 238 elif not started and TAP_START.search(line): 239 # start extracting KTAP lines and set prefix 240 # to number of characters before version line 241 prefix_len = len(line.split('TAP version')[0]) 242 started = True 243 yield line_num, line[prefix_len:] 244 elif started and KTAP_END.search(line): 245 # stop extracting KTAP lines 246 break 247 elif started: 248 # remove prefix and any indention and yield 249 # line with line number 250 line = line[prefix_len:].lstrip() 251 yield line_num, line 252 return LineStream(lines=isolate_ktap_output(kernel_output)) 253 254KTAP_VERSIONS = [1] 255TAP_VERSIONS = [13, 14] 256 257def check_version(version_num: int, accepted_versions: List[int], 258 version_type: str, test: Test) -> None: 259 """ 260 Adds error to test object if version number is too high or too 261 low. 262 263 Parameters: 264 version_num - The inputted version number from the parsed KTAP or TAP 265 header line 266 accepted_version - List of accepted KTAP or TAP versions 267 version_type - 'KTAP' or 'TAP' depending on the type of 268 version line. 269 test - Test object for current test being parsed 270 """ 271 if version_num < min(accepted_versions): 272 test.add_error(f'{version_type} version lower than expected!') 273 elif version_num > max(accepted_versions): 274 test.add_error(f'{version_type} version higer than expected!') 275 276def parse_ktap_header(lines: LineStream, test: Test) -> bool: 277 """ 278 Parses KTAP/TAP header line and checks version number. 279 Returns False if fails to parse KTAP/TAP header line. 280 281 Accepted formats: 282 - 'KTAP version [version number]' 283 - 'TAP version [version number]' 284 285 Parameters: 286 lines - LineStream of KTAP output to parse 287 test - Test object for current test being parsed 288 289 Return: 290 True if successfully parsed KTAP/TAP header line 291 """ 292 ktap_match = KTAP_START.match(lines.peek()) 293 tap_match = TAP_START.match(lines.peek()) 294 if ktap_match: 295 version_num = int(ktap_match.group(1)) 296 check_version(version_num, KTAP_VERSIONS, 'KTAP', test) 297 elif tap_match: 298 version_num = int(tap_match.group(1)) 299 check_version(version_num, TAP_VERSIONS, 'TAP', test) 300 else: 301 return False 302 test.log.append(lines.pop()) 303 return True 304 305TEST_HEADER = re.compile(r'^# Subtest: (.*)$') 306 307def parse_test_header(lines: LineStream, test: Test) -> bool: 308 """ 309 Parses test header and stores test name in test object. 310 Returns False if fails to parse test header line. 311 312 Accepted format: 313 - '# Subtest: [test name]' 314 315 Parameters: 316 lines - LineStream of KTAP output to parse 317 test - Test object for current test being parsed 318 319 Return: 320 True if successfully parsed test header line 321 """ 322 match = TEST_HEADER.match(lines.peek()) 323 if not match: 324 return False 325 test.log.append(lines.pop()) 326 test.name = match.group(1) 327 return True 328 329TEST_PLAN = re.compile(r'1\.\.([0-9]+)') 330 331def parse_test_plan(lines: LineStream, test: Test) -> bool: 332 """ 333 Parses test plan line and stores the expected number of subtests in 334 test object. Reports an error if expected count is 0. 335 Returns False and sets expected_count to None if there is no valid test 336 plan. 337 338 Accepted format: 339 - '1..[number of subtests]' 340 341 Parameters: 342 lines - LineStream of KTAP output to parse 343 test - Test object for current test being parsed 344 345 Return: 346 True if successfully parsed test plan line 347 """ 348 match = TEST_PLAN.match(lines.peek()) 349 if not match: 350 test.expected_count = None 351 return False 352 test.log.append(lines.pop()) 353 expected_count = int(match.group(1)) 354 test.expected_count = expected_count 355 return True 356 357TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') 358 359TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$') 360 361def peek_test_name_match(lines: LineStream, test: Test) -> bool: 362 """ 363 Matches current line with the format of a test result line and checks 364 if the name matches the name of the current test. 365 Returns False if fails to match format or name. 366 367 Accepted format: 368 - '[ok|not ok] [test number] [-] [test name] [optional skip 369 directive]' 370 371 Parameters: 372 lines - LineStream of KTAP output to parse 373 test - Test object for current test being parsed 374 375 Return: 376 True if matched a test result line and the name matching the 377 expected test name 378 """ 379 line = lines.peek() 380 match = TEST_RESULT.match(line) 381 if not match: 382 return False 383 name = match.group(4) 384 return (name == test.name) 385 386def parse_test_result(lines: LineStream, test: Test, 387 expected_num: int) -> bool: 388 """ 389 Parses test result line and stores the status and name in the test 390 object. Reports an error if the test number does not match expected 391 test number. 392 Returns False if fails to parse test result line. 393 394 Note that the SKIP directive is the only direction that causes a 395 change in status. 396 397 Accepted format: 398 - '[ok|not ok] [test number] [-] [test name] [optional skip 399 directive]' 400 401 Parameters: 402 lines - LineStream of KTAP output to parse 403 test - Test object for current test being parsed 404 expected_num - expected test number for current test 405 406 Return: 407 True if successfully parsed a test result line. 408 """ 409 line = lines.peek() 410 match = TEST_RESULT.match(line) 411 skip_match = TEST_RESULT_SKIP.match(line) 412 413 # Check if line matches test result line format 414 if not match: 415 return False 416 test.log.append(lines.pop()) 417 418 # Set name of test object 419 if skip_match: 420 test.name = skip_match.group(4) 421 else: 422 test.name = match.group(4) 423 424 # Check test num 425 num = int(match.group(2)) 426 if num != expected_num: 427 test.add_error(f'Expected test number {expected_num} but found {num}') 428 429 # Set status of test object 430 status = match.group(1) 431 if skip_match: 432 test.status = TestStatus.SKIPPED 433 elif status == 'ok': 434 test.status = TestStatus.SUCCESS 435 else: 436 test.status = TestStatus.FAILURE 437 return True 438 439def parse_diagnostic(lines: LineStream) -> List[str]: 440 """ 441 Parse lines that do not match the format of a test result line or 442 test header line and returns them in list. 443 444 Line formats that are not parsed: 445 - '# Subtest: [test name]' 446 - '[ok|not ok] [test number] [-] [test name] [optional skip 447 directive]' 448 449 Parameters: 450 lines - LineStream of KTAP output to parse 451 452 Return: 453 Log of diagnostic lines 454 """ 455 log = [] # type: List[str] 456 while lines and not TEST_RESULT.match(lines.peek()) and not \ 457 TEST_HEADER.match(lines.peek()): 458 log.append(lines.pop()) 459 return log 460 461 462# Printing helper methods: 463 464DIVIDER = '=' * 60 465 466RESET = '\033[0;0m' 467 468def red(text: str) -> str: 469 """Returns inputted string with red color code.""" 470 if not sys.stdout.isatty(): 471 return text 472 return '\033[1;31m' + text + RESET 473 474def yellow(text: str) -> str: 475 """Returns inputted string with yellow color code.""" 476 if not sys.stdout.isatty(): 477 return text 478 return '\033[1;33m' + text + RESET 479 480def green(text: str) -> str: 481 """Returns inputted string with green color code.""" 482 if not sys.stdout.isatty(): 483 return text 484 return '\033[1;32m' + text + RESET 485 486ANSI_LEN = len(red('')) 487 488def print_with_timestamp(message: str) -> None: 489 """Prints message with timestamp at beginning.""" 490 print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message)) 491 492def format_test_divider(message: str, len_message: int) -> str: 493 """ 494 Returns string with message centered in fixed width divider. 495 496 Example: 497 '===================== message example =====================' 498 499 Parameters: 500 message - message to be centered in divider line 501 len_message - length of the message to be printed such that 502 any characters of the color codes are not counted 503 504 Return: 505 String containing message centered in fixed width divider 506 """ 507 default_count = 3 # default number of dashes 508 len_1 = default_count 509 len_2 = default_count 510 difference = len(DIVIDER) - len_message - 2 # 2 spaces added 511 if difference > 0: 512 # calculate number of dashes for each side of the divider 513 len_1 = int(difference / 2) 514 len_2 = difference - len_1 515 return ('=' * len_1) + f' {message} ' + ('=' * len_2) 516 517def print_test_header(test: Test) -> None: 518 """ 519 Prints test header with test name and optionally the expected number 520 of subtests. 521 522 Example: 523 '=================== example (2 subtests) ===================' 524 525 Parameters: 526 test - Test object representing current test being printed 527 """ 528 message = test.name 529 if test.expected_count: 530 if test.expected_count == 1: 531 message += ' (1 subtest)' 532 else: 533 message += f' ({test.expected_count} subtests)' 534 print_with_timestamp(format_test_divider(message, len(message))) 535 536def print_log(log: Iterable[str]) -> None: 537 """Prints all strings in saved log for test in yellow.""" 538 for m in log: 539 print_with_timestamp(yellow(m)) 540 541def format_test_result(test: Test) -> str: 542 """ 543 Returns string with formatted test result with colored status and test 544 name. 545 546 Example: 547 '[PASSED] example' 548 549 Parameters: 550 test - Test object representing current test being printed 551 552 Return: 553 String containing formatted test result 554 """ 555 if test.status == TestStatus.SUCCESS: 556 return (green('[PASSED] ') + test.name) 557 elif test.status == TestStatus.SKIPPED: 558 return (yellow('[SKIPPED] ') + test.name) 559 elif test.status == TestStatus.NO_TESTS: 560 return (yellow('[NO TESTS RUN] ') + test.name) 561 elif test.status == TestStatus.TEST_CRASHED: 562 print_log(test.log) 563 return (red('[CRASHED] ') + test.name) 564 else: 565 print_log(test.log) 566 return (red('[FAILED] ') + test.name) 567 568def print_test_result(test: Test) -> None: 569 """ 570 Prints result line with status of test. 571 572 Example: 573 '[PASSED] example' 574 575 Parameters: 576 test - Test object representing current test being printed 577 """ 578 print_with_timestamp(format_test_result(test)) 579 580def print_test_footer(test: Test) -> None: 581 """ 582 Prints test footer with status of test. 583 584 Example: 585 '===================== [PASSED] example =====================' 586 587 Parameters: 588 test - Test object representing current test being printed 589 """ 590 message = format_test_result(test) 591 print_with_timestamp(format_test_divider(message, 592 len(message) - ANSI_LEN)) 593 594def print_summary_line(test: Test) -> None: 595 """ 596 Prints summary line of test object. Color of line is dependent on 597 status of test. Color is green if test passes, yellow if test is 598 skipped, and red if the test fails or crashes. Summary line contains 599 counts of the statuses of the tests subtests or the test itself if it 600 has no subtests. 601 602 Example: 603 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, 604 Errors: 0" 605 606 test - Test object representing current test being printed 607 """ 608 if test.status == TestStatus.SUCCESS: 609 color = green 610 elif test.status == TestStatus.SKIPPED or test.status == TestStatus.NO_TESTS: 611 color = yellow 612 else: 613 color = red 614 print_with_timestamp(color(f'Testing complete. {test.counts}')) 615 616# Other methods: 617 618def bubble_up_test_results(test: Test) -> None: 619 """ 620 If the test has subtests, add the test counts of the subtests to the 621 test and check if any of the tests crashed and if so set the test 622 status to crashed. Otherwise if the test has no subtests add the 623 status of the test to the test counts. 624 625 Parameters: 626 test - Test object for current test being parsed 627 """ 628 subtests = test.subtests 629 counts = test.counts 630 status = test.status 631 for t in subtests: 632 counts.add_subtest_counts(t.counts) 633 if counts.total() == 0: 634 counts.add_status(status) 635 elif test.counts.get_status() == TestStatus.TEST_CRASHED: 636 test.status = TestStatus.TEST_CRASHED 637 638def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: 639 """ 640 Finds next test to parse in LineStream, creates new Test object, 641 parses any subtests of the test, populates Test object with all 642 information (status, name) about the test and the Test objects for 643 any subtests, and then returns the Test object. The method accepts 644 three formats of tests: 645 646 Accepted test formats: 647 648 - Main KTAP/TAP header 649 650 Example: 651 652 KTAP version 1 653 1..4 654 [subtests] 655 656 - Subtest header line 657 658 Example: 659 660 # Subtest: name 661 1..3 662 [subtests] 663 ok 1 name 664 665 - Test result line 666 667 Example: 668 669 ok 1 - test 670 671 Parameters: 672 lines - LineStream of KTAP output to parse 673 expected_num - expected test number for test to be parsed 674 log - list of strings containing any preceding diagnostic lines 675 corresponding to the current test 676 677 Return: 678 Test object populated with characteristics and any subtests 679 """ 680 test = Test() 681 test.log.extend(log) 682 parent_test = False 683 main = parse_ktap_header(lines, test) 684 if main: 685 # If KTAP/TAP header is found, attempt to parse 686 # test plan 687 test.name = "main" 688 parse_test_plan(lines, test) 689 parent_test = True 690 else: 691 # If KTAP/TAP header is not found, test must be subtest 692 # header or test result line so parse attempt to parser 693 # subtest header 694 parent_test = parse_test_header(lines, test) 695 if parent_test: 696 # If subtest header is found, attempt to parse 697 # test plan and print header 698 parse_test_plan(lines, test) 699 print_test_header(test) 700 expected_count = test.expected_count 701 subtests = [] 702 test_num = 1 703 while parent_test and (expected_count is None or test_num <= expected_count): 704 # Loop to parse any subtests. 705 # Break after parsing expected number of tests or 706 # if expected number of tests is unknown break when test 707 # result line with matching name to subtest header is found 708 # or no more lines in stream. 709 sub_log = parse_diagnostic(lines) 710 sub_test = Test() 711 if not lines or (peek_test_name_match(lines, test) and 712 not main): 713 if expected_count and test_num <= expected_count: 714 # If parser reaches end of test before 715 # parsing expected number of subtests, print 716 # crashed subtest and record error 717 test.add_error('missing expected subtest!') 718 sub_test.log.extend(sub_log) 719 test.counts.add_status( 720 TestStatus.TEST_CRASHED) 721 print_test_result(sub_test) 722 else: 723 test.log.extend(sub_log) 724 break 725 else: 726 sub_test = parse_test(lines, test_num, sub_log) 727 subtests.append(sub_test) 728 test_num += 1 729 test.subtests = subtests 730 if not main: 731 # If not main test, look for test result line 732 test.log.extend(parse_diagnostic(lines)) 733 if (parent_test and peek_test_name_match(lines, test)) or \ 734 not parent_test: 735 parse_test_result(lines, test, expected_num) 736 else: 737 test.add_error('missing subtest result line!') 738 739 # Check for there being no tests 740 if parent_test and len(subtests) == 0: 741 # Don't override a bad status if this test had one reported. 742 # Assumption: no subtests means CRASHED is from Test.__init__() 743 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): 744 test.status = TestStatus.NO_TESTS 745 test.add_error('0 tests run!') 746 747 # Add statuses to TestCounts attribute in Test object 748 bubble_up_test_results(test) 749 if parent_test and not main: 750 # If test has subtests and is not the main test object, print 751 # footer. 752 print_test_footer(test) 753 elif not main: 754 print_test_result(test) 755 return test 756 757def parse_run_tests(kernel_output: Iterable[str]) -> Test: 758 """ 759 Using kernel output, extract KTAP lines, parse the lines for test 760 results and print condensed test results and summary line. 761 762 Parameters: 763 kernel_output - Iterable object contains lines of kernel output 764 765 Return: 766 Test - the main test object with all subtests. 767 """ 768 print_with_timestamp(DIVIDER) 769 lines = extract_tap_lines(kernel_output) 770 test = Test() 771 if not lines: 772 test.name = '<missing>' 773 test.add_error('could not find any KTAP output!') 774 test.status = TestStatus.FAILURE_TO_PARSE_TESTS 775 else: 776 test = parse_test(lines, 0, []) 777 if test.status != TestStatus.NO_TESTS: 778 test.status = test.counts.get_status() 779 print_with_timestamp(DIVIDER) 780 print_summary_line(test) 781 return test 782