1##
2# Copyright (c) 2023 Apple Inc. All rights reserved.
3#
4# @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5#
6# This file contains Original Code and/or Modifications of Original Code
7# as defined in and that are subject to the Apple Public Source License
8# Version 2.0 (the 'License'). You may not use this file except in
9# compliance with the License. The rights granted to you under the License
10# may not be used to create, or enable the creation or redistribution of,
11# unlawful or unlicensed copies of an Apple operating system, or to
12# circumvent, violate, or enable the circumvention or violation of, any
13# terms of an Apple operating system software license agreement.
14#
15# Please obtain a copy of the License at
16# http://www.opensource.apple.com/apsl/ and read it before using this file.
17#
18# The Original Code and all software distributed under the License are
19# distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20# EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21# INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22# FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23# Please see the License for the specific language governing rights and
24# limitations under the License.
25#
26# @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27##
28
29# pylint: disable=invalid-name
30# pylint: disable=protected-access
31
32""" Test process.py """
33
34import contextlib
35import io
36import unittest
37from lldbtest.testcase import LLDBTestCase
38from lldbmock.utils import lookup_type
39from lldbmock.valuemock import ValueMock
40
41import process as tst_process
42import utils as tst_utils
43from process import ShowTask, ShowProc, INVALID_PROC_SUMMARY, INVALID_TASK_SUMMARY
44
45class ProcessTest(LLDBTestCase):
46    """ Tests for process.py module """
47
48    ROUNDED_UP_PROC_SIZE = 2048
49
50    def test_GetProcPid(self):
51        """ Test a pid gets returned. """
52
53        proc = ValueMock.createFromType('proc')
54        proc.p_pid = 12345
55
56        self.assertEqual(tst_process.GetProcPID(proc), 12345)
57        self.assertEqual(tst_process.GetProcPID(None), -1)
58
59    def test_GetNameShort(self):
60        """ Test fallback to short name. """
61
62        proc = ValueMock.createFromType('proc')
63        proc.p_name = ""
64        proc.p_comm = "short-proc"
65
66        self.assertEqual(tst_process.GetProcName(proc), "short-proc")
67
68    def test_GetNameLong(self):
69        """ Test that long name is preferred. """
70
71        proc = ValueMock.createFromType('proc')
72        proc.p_name = "long-proc"
73        proc.p_comm = "short-proc"
74
75        self.assertEqual(tst_process.GetProcName(proc), "long-proc")
76
77    def test_GetNameInvalid(self):
78        """ Test that invalid proc returns default name. """
79
80        self.assertEqual(
81            tst_process.GetProcName(None),
82            tst_process.NO_PROC_NAME
83        )
84
85    def test_ASTValuesInSync(self):
86        """ Test that thread states cover all values defined in kernel. """
87
88        # Compare all values with AST chars dictionary.
89        macro = tst_process._AST_CHARS.keys()
90
91        # Add rest of values from the enum in kernel.
92        enum = lookup_type('ast_t')
93        self.assertTrue(enum.IsValid())
94
95        kernel = [
96            k.GetValueAsUnsigned()
97            for k in enum.get_enum_members_array()
98        ]
99
100        # Assert that both sides handle identical set of flags.
101        self.assertSetEqual(set(macro), set(kernel),
102                            "thread state chars mismatch")
103
104    def test_GetAstSummary(self):
105        """ Test AST string genration. """
106
107        ast = tst_utils.GetEnumValue('ast_t', 'AST_DTRACE')
108        ast |= tst_utils.GetEnumValue('ast_t', 'AST_TELEMETRY_KERNEL')
109
110        # Check valid AST
111        self.assertEqual(tst_process.GetASTSummary(ast), 'TD')
112
113        # Check that we never touch unsupported bits in an invalid value
114        ast = 0xffffffff
115        aststr = ''.join(char for _, char in tst_process._AST_CHARS.items())
116
117        self.assertEqual(tst_process.GetASTSummary(ast), aststr)
118
119    # Mock global variable value (accessed by the macro being used)
120    @unittest.mock.patch('xnu.kern.globals.proc_struct_size', ROUNDED_UP_PROC_SIZE)
121    def test_ProcWithoutTask(self):
122        """ Test that ShowTask handles process-less task gracefully, and vice-versa for ShowProc """
123        self.reset_mocks()
124
125        PROC_ADDR = 0xffffffff90909090
126        TASK_ADDR = PROC_ADDR + self.ROUNDED_UP_PROC_SIZE
127        PROC_RO_ADDR = 0xffffff0040404040
128
129        # Create fake proc_t instance at 0xffffffff90909090
130        proc = self.create_mock('proc', PROC_ADDR).fromDict({
131            'p_pid': 12345,
132            'p_lflag': 0, # P_LHASTASK not set
133            'p_comm': b'test-proc\0'
134        })
135
136        # Create task which is expected to be placed + sizeof(proc)
137        task = self.create_mock('task', TASK_ADDR).fromDict({
138            'effective_policy': {
139                'tep_sup_active': 0,
140                'tep_darwinbg': 0,
141                'tep_lowpri_cpu': 1
142            },
143            't_flags': 0 # TF_HASPROC not set
144        })
145
146        # Created shared proc_ro reference from both task/proc
147        self.create_mock('proc_ro', PROC_RO_ADDR)
148        proc.p_proc_ro = PROC_RO_ADDR
149        task.bsd_info_ro = PROC_RO_ADDR
150
151        # Capture stdout and check expected output
152        stdout = io.StringIO()
153        with contextlib.redirect_stdout(stdout):
154            ShowTask([f'{TASK_ADDR:#x}'])
155            ShowProc([f'{PROC_ADDR:#x}'])
156
157        self.assertIn(INVALID_PROC_SUMMARY, stdout.getvalue())
158        self.assertIn(INVALID_TASK_SUMMARY, stdout.getvalue())
159