1
2import os
3from time import sleep
4
5import gdbremote_testcase
6import lldbgdbserverutils
7from lldbsuite.test.decorators import *
8from lldbsuite.test.lldbtest import *
9from lldbsuite.test import lldbutil
10
11
12class TestGdbRemoteAttachWait(gdbremote_testcase.GdbRemoteTestCaseBase):
13
14    def _set_up_inferior(self):
15        self._exe_to_attach = '%s_%d' % (self.testMethodName, os.getpid())
16        self.build(dictionary={'EXE': self._exe_to_attach, 'CXX_SOURCES': 'main.cpp'})
17
18        if self.getPlatform() != "windows":
19            # Use a shim to ensure that the process is ready to be attached from
20            # the get-go.
21            self._exe_to_run = "shim"
22            self._run_args = [self.getBuildArtifact(self._exe_to_attach)]
23            self.build(dictionary={'EXE': self._exe_to_run, 'CXX_SOURCES':
24                'shim.cpp'})
25        else:
26            self._exe_to_run = self._exe_to_attach
27            self._run_args = []
28
29    def _launch_inferior(self, args):
30        inferior = self.spawnSubprocess(self.getBuildArtifact(self._exe_to_run),
31                args)
32        self.assertIsNotNone(inferior)
33        self.assertTrue(inferior.pid > 0)
34        self.assertTrue(
35            lldbgdbserverutils.process_is_running(
36                inferior.pid, True))
37        return inferior
38
39    def _launch_and_wait_for_init(self):
40        sync_file_path = lldbutil.append_to_process_working_directory(self,
41                "process_ready")
42        inferior = self._launch_inferior(self._run_args + [sync_file_path])
43        lldbutil.wait_for_file_on_target(self, sync_file_path)
44        return inferior
45
46    def _attach_packet(self, packet_type):
47        return "read packet: ${};{}#00".format(packet_type,
48                lldbgdbserverutils.gdbremote_hex_encode_string(self._exe_to_attach))
49
50
51    @skipIfWindows # This test is flaky on Windows
52    def test_attach_with_vAttachWait(self):
53        self._set_up_inferior()
54
55        self.set_inferior_startup_attach_manually()
56        server = self.connect_to_debug_monitor()
57        self.do_handshake()
58
59        # Launch the first inferior (we shouldn't attach to this one).
60        self._launch_and_wait_for_init()
61
62        self.test_sequence.add_log_lines([self._attach_packet("vAttachWait")],
63                True)
64        # Run the stream until attachWait.
65        context = self.expect_gdbremote_sequence()
66        self.assertIsNotNone(context)
67
68        # Sleep so we're sure that the inferior is launched after we ask for the attach.
69        sleep(1)
70
71        # Launch the second inferior (we SHOULD attach to this one).
72        inferior_to_attach = self._launch_inferior(self._run_args)
73
74        # Make sure the attach succeeded.
75        self.test_sequence.add_log_lines([
76            {"direction": "send",
77             "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
78             "capture": {1: "stop_signal_hex"}},
79        ], True)
80        self.add_process_info_collection_packets()
81
82
83        # Run the stream sending the response..
84        context = self.expect_gdbremote_sequence()
85        self.assertIsNotNone(context)
86
87        # Gather process info response.
88        process_info = self.parse_process_info_response(context)
89        self.assertIsNotNone(process_info)
90
91        # Ensure the process id matches what we expected.
92        pid_text = process_info.get('pid', None)
93        self.assertIsNotNone(pid_text)
94        reported_pid = int(pid_text, base=16)
95        self.assertEqual(reported_pid, inferior_to_attach.pid)
96
97    @skipIfWindows # This test is flaky on Windows
98    def test_launch_before_attach_with_vAttachOrWait(self):
99        self._set_up_inferior()
100
101        self.set_inferior_startup_attach_manually()
102        server = self.connect_to_debug_monitor()
103        self.do_handshake()
104
105        inferior = self._launch_and_wait_for_init()
106
107        # Add attach packets.
108        self.test_sequence.add_log_lines([
109            # Do the attach.
110            self._attach_packet("vAttachOrWait"),
111            # Expect a stop notification from the attach.
112            {"direction": "send",
113             "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
114             "capture": {1: "stop_signal_hex"}},
115        ], True)
116        self.add_process_info_collection_packets()
117
118        # Run the stream
119        context = self.expect_gdbremote_sequence()
120        self.assertIsNotNone(context)
121
122        # Gather process info response
123        process_info = self.parse_process_info_response(context)
124        self.assertIsNotNone(process_info)
125
126        # Ensure the process id matches what we expected.
127        pid_text = process_info.get('pid', None)
128        self.assertIsNotNone(pid_text)
129        reported_pid = int(pid_text, base=16)
130        self.assertEqual(reported_pid, inferior.pid)
131
132    @skipIfWindows # This test is flaky on Windows
133    def test_launch_after_attach_with_vAttachOrWait(self):
134        self._set_up_inferior()
135
136        self.set_inferior_startup_attach_manually()
137        server = self.connect_to_debug_monitor()
138        self.do_handshake()
139
140        self.test_sequence.add_log_lines([self._attach_packet("vAttachOrWait")],
141                True)
142        # Run the stream until attachWait.
143        context = self.expect_gdbremote_sequence()
144        self.assertIsNotNone(context)
145
146        # Sleep so we're sure that the inferior is launched after we ask for the attach.
147        sleep(1)
148
149        # Launch the inferior.
150        inferior = self._launch_inferior(self._run_args)
151
152        # Make sure the attach succeeded.
153        self.test_sequence.add_log_lines([
154            {"direction": "send",
155             "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
156             "capture": {1: "stop_signal_hex"}},
157        ], True)
158        self.add_process_info_collection_packets()
159
160
161        # Run the stream sending the response..
162        context = self.expect_gdbremote_sequence()
163        self.assertIsNotNone(context)
164
165        # Gather process info response.
166        process_info = self.parse_process_info_response(context)
167        self.assertIsNotNone(process_info)
168
169        # Ensure the process id matches what we expected.
170        pid_text = process_info.get('pid', None)
171        self.assertIsNotNone(pid_text)
172        reported_pid = int(pid_text, base=16)
173        self.assertEqual(reported_pid, inferior.pid)
174