1import lldb
2import json
3from intelpt_testcase import *
4from lldbsuite.test.lldbtest import *
5from lldbsuite.test import lldbutil
6from lldbsuite.test.decorators import *
7
8def find(predicate, seq):
9  for item in seq:
10    if predicate(item):
11      return item
12
13class TestTraceSave(TraceIntelPTTestCaseBase):
14
15    def testErrorMessages(self):
16        # We first check the output when there are no targets
17        self.expect("trace save",
18            substrs=["error: invalid target, create a target using the 'target create' command"],
19            error=True)
20
21        # We now check the output when there's a non-running target
22        self.expect("target create " +
23            os.path.join(self.getSourceDir(), "intelpt-trace", "a.out"))
24
25        self.expect("trace save",
26            substrs=["error: Command requires a current process."],
27            error=True)
28
29        # Now we check the output when there's a running target without a trace
30        self.expect("b main")
31        self.expect("run")
32
33        self.expect("trace save",
34            substrs=["error: Process is not being traced"],
35            error=True)
36
37    def testSaveToInvalidDir(self):
38        self.expect("target create " +
39            os.path.join(self.getSourceDir(), "intelpt-trace", "a.out"))
40        self.expect("b main")
41        self.expect("r")
42        self.expect("thread trace start")
43        self.expect("n")
44
45        # Check the output when saving without providing the directory argument
46        self.expect("trace save ",
47            substrs=["error: a single path to a directory where the trace bundle will be created is required"],
48            error=True)
49
50        # Check the output when saving to an invalid directory
51        self.expect("trace save /",
52            substrs=["error: couldn't write to the file"],
53            error=True)
54
55    def testSaveWhenNotLiveTrace(self):
56        self.expect("trace load -v " +
57            os.path.join(self.getSourceDir(), "intelpt-trace", "trace.json"),
58            substrs=["intel-pt"])
59
60        # Check the output when not doing live tracing
61        self.expect("trace save " +
62            os.path.join(self.getBuildDir(), "intelpt-trace", "trace_not_live_dir"))
63
64    def testSaveMultiCpuTrace(self):
65        '''
66            This test starts a per-cpu tracing session, then saves the session to disk, and
67            finally it loads it again.
68        '''
69        self.skipIfPerCpuTracingIsNotSupported()
70
71        self.expect("target create " +
72            os.path.join(self.getSourceDir(), "intelpt-trace", "a.out"))
73        self.expect("b main")
74        self.expect("r")
75        self.expect("process trace start --per-cpu-tracing")
76        self.expect("b 7")
77
78        output_dir = os.path.join(self.getBuildDir(), "intelpt-trace", "trace_save")
79        self.expect("trace save " + output_dir)
80
81        def checkSessionBundle(session_file_path):
82            with open(session_file_path) as session_file:
83                session = json.load(session_file)
84                # We expect tsc conversion info
85                self.assertTrue("tscPerfZeroConversion" in session)
86                # We expect at least one cpu
87                self.assertGreater(len(session["cpus"]), 0)
88
89                # We expect the required trace files to be created
90                for cpu in session["cpus"]:
91                    cpu_files_prefix = os.path.join(output_dir, "cpus", str(cpu["id"]))
92                    self.assertTrue(os.path.exists(cpu_files_prefix + ".intelpt_trace"))
93                    self.assertTrue(os.path.exists(cpu_files_prefix + ".perf_context_switch_trace"))
94
95                # We expect at least one one process
96                self.assertGreater(len(session["processes"]), 0)
97                for process in session["processes"]:
98                    # We expect at least one thread
99                    self.assertGreater(len(process["threads"]), 0)
100                    # We don't expect thread traces
101                    for thread in process["threads"]:
102                        self.assertTrue(("iptTrace" not in thread) or (thread["iptTrace"] is None))
103
104        original_trace_session_file = os.path.join(output_dir, "trace.json")
105        checkSessionBundle(original_trace_session_file)
106
107        output_dir = os.path.join(self.getBuildDir(), "intelpt-trace", "trace_save")
108        self.expect("trace load " + os.path.join(output_dir, "trace.json"))
109        output_copy_dir = os.path.join(self.getBuildDir(), "intelpt-trace", "copy_trace_save")
110        self.expect("trace save " + output_copy_dir)
111
112        # We now check that the new bundle is correct on its own
113        copied_trace_session_file = os.path.join(output_copy_dir, "trace.json")
114        checkSessionBundle(copied_trace_session_file)
115
116        # We finally check that the new bundle has the same information as the original one
117        with open(original_trace_session_file) as original_file:
118            original = json.load(original_file)
119            with open(copied_trace_session_file) as copy_file:
120                copy = json.load(copy_file)
121
122                self.assertEqual(len(original["processes"]), len(copy["processes"]))
123
124                for process in original["processes"]:
125                    copied_process = find(lambda proc : proc["pid"] == process["pid"], copy["processes"])
126                    self.assertTrue(copied_process is not None)
127
128                    for thread in process["threads"]:
129                        copied_thread = find(lambda thr : thr["tid"] == thread["tid"], copied_process["threads"])
130                        self.assertTrue(copied_thread is not None)
131
132                for cpu in original["cpus"]:
133                    copied_cpu = find(lambda cor : cor["id"] == cpu["id"], copy["cpus"])
134                    self.assertTrue(copied_cpu is not None)
135
136    def testSaveTrace(self):
137        self.expect("target create " +
138            os.path.join(self.getSourceDir(), "intelpt-trace", "a.out"))
139        self.expect("b main")
140        self.expect("r")
141        self.expect("thread trace start")
142        self.expect("b 7")
143
144        ci = self.dbg.GetCommandInterpreter()
145        res = lldb.SBCommandReturnObject()
146
147        ci.HandleCommand("thread trace dump instructions -c 10 --forwards", res)
148        self.assertEqual(res.Succeeded(), True)
149        first_ten_instructions = res.GetOutput()
150
151        ci.HandleCommand("thread trace dump instructions -c 10", res)
152        self.assertEqual(res.Succeeded(), True)
153        last_ten_instructions = res.GetOutput()
154
155        # Now, save the trace to <trace_copy_dir>
156        self.expect("trace save " +
157            os.path.join(self.getBuildDir(), "intelpt-trace", "trace_copy_dir"))
158
159        # Load the trace just saved
160        self.expect("trace load -v " +
161            os.path.join(self.getBuildDir(), "intelpt-trace", "trace_copy_dir", "trace.json"),
162            substrs=["intel-pt"])
163
164        # Compare with instructions saved at the first time
165        ci.HandleCommand("thread trace dump instructions -c 10 --forwards", res)
166        self.assertEqual(res.Succeeded(), True)
167        self.assertEqual(res.GetOutput(), first_ten_instructions)
168
169        ci.HandleCommand("thread trace dump instructions -c 10", res)
170        self.assertEqual(res.Succeeded(), True)
171        self.assertEqual(res.GetOutput(), last_ten_instructions)
172