1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===//
2 //
3 //                     The LLVM Compiler Infrastructure
4 //
5 // This file is distributed under the University of Illinois Open Source
6 // License. See LICENSE.TXT for details.
7 //
8 //===----------------------------------------------------------------------===//
9 
10 // C Includes
11 // C++ Includes
12 // Other libraries and framework includes
13 // Project includes
14 #include "lldb/lldb-private-log.h"
15 #include "lldb/Core/Communication.h"
16 #include "lldb/Core/Connection.h"
17 #include "lldb/Core/Log.h"
18 #include "lldb/Core/Timer.h"
19 #include "lldb/Core/Event.h"
20 
21 using namespace lldb;
22 using namespace lldb_private;
23 
24 //----------------------------------------------------------------------
25 // Constructor
26 //----------------------------------------------------------------------
27 Communication::Communication(const char *name) :
28     Broadcaster (name),
29     m_connection_ap (),
30     m_read_thread (NULL),
31     m_read_thread_enabled (false),
32     m_bytes(),
33     m_bytes_mutex (Mutex::eMutexTypeRecursive),
34     m_callback (NULL),
35     m_callback_baton (NULL)
36 
37 {
38     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
39                                  "%p Communication::Communication (name = %s)",
40                                  this, name);
41 }
42 
43 //----------------------------------------------------------------------
44 // Destructor
45 //----------------------------------------------------------------------
46 Communication::~Communication()
47 {
48     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
49                                  "%p Communication::~Communication (name = %s)",
50                                  this, m_broadcaster_name.AsCString(""));
51     Clear();
52 }
53 
54 void
55 Communication::Clear()
56 {
57     StopReadThread (NULL);
58     Disconnect (NULL);
59 }
60 
61 ConnectionStatus
62 Communication::BytesAvailable (uint32_t timeout_usec, Error *error_ptr)
63 {
64     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::BytesAvailable (timeout_usec = %u)", this, timeout_usec);
65 
66     if (m_connection_ap.get())
67         return m_connection_ap->BytesAvailable (timeout_usec, error_ptr);
68     if (error_ptr)
69         error_ptr->SetErrorString("Invalid connection.");
70     return eConnectionStatusNoConnection;
71 }
72 
73 ConnectionStatus
74 Communication::Connect (const char *url, Error *error_ptr)
75 {
76     Clear();
77 
78     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Connect (url = %s)", this, url);
79 
80     if (m_connection_ap.get())
81         return m_connection_ap->Connect (url, error_ptr);
82     if (error_ptr)
83         error_ptr->SetErrorString("Invalid connection.");
84     return eConnectionStatusNoConnection;
85 }
86 
87 ConnectionStatus
88 Communication::Disconnect (Error *error_ptr)
89 {
90     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Disconnect ()", this);
91 
92     if (m_connection_ap.get())
93     {
94         ConnectionStatus status = m_connection_ap->Disconnect (error_ptr);
95         m_connection_ap.reset();
96         return status;
97     }
98     return eConnectionStatusNoConnection;
99 }
100 
101 bool
102 Communication::IsConnected () const
103 {
104     if (m_connection_ap.get())
105         return m_connection_ap->IsConnected ();
106     return false;
107 }
108 
109 bool
110 Communication::HasConnection () const
111 {
112     return m_connection_ap.get() != NULL;
113 }
114 
115 size_t
116 Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr)
117 {
118     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
119                                  "%p Communication::Write (dst = %p, dst_len = %zu, timeout_usec = %u) connection = %p",
120                                  this, dst, dst_len, timeout_usec, m_connection_ap.get());
121 
122     if (m_read_thread != NULL)
123     {
124         // We have a dedicated read thread that is getting data for us
125         size_t cached_bytes = GetCachedBytes (dst, dst_len);
126         if (cached_bytes > 0 || timeout_usec == 0)
127         {
128             status = eConnectionStatusSuccess;
129             return cached_bytes;
130         }
131 
132         if (m_connection_ap.get() == NULL)
133         {
134             if (error_ptr)
135                 error_ptr->SetErrorString("Invalid connection.");
136             status = eConnectionStatusNoConnection;
137             return 0;
138         }
139         // Set the timeout appropriately
140         TimeValue timeout_time;
141         if (timeout_usec != UINT32_MAX)
142         {
143             timeout_time = TimeValue::Now();
144             timeout_time.OffsetWithMicroSeconds (timeout_usec);
145         }
146 
147         Listener listener ("Communication::Read");
148         listener.StartListeningForEvents (this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
149         EventSP event_sp;
150         while (listener.WaitForEvent (timeout_time.IsValid() ? &timeout_time : NULL, event_sp))
151         {
152             const uint32_t event_type = event_sp->GetType();
153             if (event_type & eBroadcastBitReadThreadGotBytes)
154             {
155                 return GetCachedBytes (dst, dst_len);
156             }
157 
158             if (event_type & eBroadcastBitReadThreadDidExit)
159             {
160                 Disconnect (NULL);
161                 break;
162             }
163         }
164         return 0;
165     }
166 
167     // We aren't using a read thread, just read the data synchronously in this
168     // thread.
169     if (m_connection_ap.get())
170     {
171         status = m_connection_ap->BytesAvailable (timeout_usec, error_ptr);
172         if (status == eConnectionStatusSuccess)
173             return m_connection_ap->Read (dst, dst_len, status, error_ptr);
174     }
175 
176     if (error_ptr)
177         error_ptr->SetErrorString("Invalid connection.");
178     status = eConnectionStatusNoConnection;
179     return 0;
180 }
181 
182 
183 size_t
184 Communication::Write (const void *src, size_t src_len, ConnectionStatus &status, Error *error_ptr)
185 {
186     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
187                                  "%p Communication::Write (src = %p, src_len = %zu) connection = %p",
188                                  this, src, src_len, m_connection_ap.get());
189 
190     if (m_connection_ap.get())
191         return m_connection_ap->Write (src, src_len, status, error_ptr);
192 
193     if (error_ptr)
194         error_ptr->SetErrorString("Invalid connection.");
195     status = eConnectionStatusNoConnection;
196     return 0;
197 }
198 
199 
200 bool
201 Communication::StartReadThread (Error *error_ptr)
202 {
203     if (m_read_thread != LLDB_INVALID_HOST_THREAD)
204         return true;
205 
206     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
207                                  "%p Communication::StartReadThread ()", this);
208 
209 
210     char thread_name[1024];
211     snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", m_broadcaster_name.AsCString());
212 
213     m_read_thread_enabled = true;
214     m_read_thread = Host::ThreadCreate (thread_name, Communication::ReadThread, this, error_ptr);
215     return m_read_thread != LLDB_INVALID_HOST_THREAD;
216 }
217 
218 bool
219 Communication::StopReadThread (Error *error_ptr)
220 {
221     if (m_read_thread == NULL)
222         return true;
223 
224     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
225                                  "%p Communication::StopReadThread ()", this);
226 
227     m_read_thread_enabled = false;
228 
229     BroadcastEvent (eBroadcastBitReadThreadShouldExit, NULL);
230 
231     Host::ThreadCancel (m_read_thread, error_ptr);
232 
233     return Host::ThreadJoin (m_read_thread, NULL, error_ptr);
234 }
235 
236 
237 size_t
238 Communication::GetCachedBytes (void *dst, size_t dst_len)
239 {
240     Mutex::Locker locker(m_bytes_mutex);
241     if (m_bytes.size() > 0)
242     {
243         // If DST is NULL and we have a thread, then return the number
244         // of bytes that are available so the caller can call again
245         if (dst == NULL)
246             return m_bytes.size();
247 
248         const size_t len = std::min<size_t>(dst_len, m_bytes.size());
249 
250         ::memcpy (dst, m_bytes.data(), len);
251         m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
252 
253         return len;
254     }
255     return 0;
256 }
257 
258 void
259 Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, bool broadcast)
260 {
261     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
262                                  "%p Communication::AppendBytesToCache (src = %p, src_len = %zu, broadcast = %i)",
263                                  this, bytes, len, broadcast);
264     if (bytes == NULL || len == 0)
265         return;
266     if (m_callback)
267     {
268         // If the user registered a callback, then call it and do not broadcast
269         m_callback (m_callback_baton, bytes, len);
270     }
271     else
272     {
273         Mutex::Locker locker(m_bytes_mutex);
274         m_bytes.append ((const char *)bytes, len);
275         if (broadcast)
276             BroadcastEventIfUnique (eBroadcastBitReadThreadGotBytes);
277     }
278 }
279 
280 size_t
281 Communication::ReadFromConnection (void *dst, size_t dst_len, ConnectionStatus &status, Error *error_ptr)
282 {
283     if (m_connection_ap.get())
284         return m_connection_ap->Read (dst, dst_len, status, error_ptr);
285     return 0;
286 }
287 
288 
289 bool
290 Communication::ReadThreadIsRunning ()
291 {
292     return m_read_thread != NULL;
293 }
294 
295 void *
296 Communication::ReadThread (void *p)
297 {
298     Communication *comm = (Communication *)p;
299 
300     Log *log = lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION);
301 
302     if (log)
303         log->Printf ("%p Communication::ReadThread () thread starting...", p);
304 
305     uint8_t buf[1024];
306 
307     Error error;
308     ConnectionStatus status = eConnectionStatusSuccess;
309     bool done = false;
310     while (!done && comm->m_read_thread_enabled)
311     {
312         status = comm->BytesAvailable (UINT32_MAX, &error);
313 
314         if (status == eConnectionStatusSuccess)
315         {
316             size_t bytes_read = comm->ReadFromConnection (buf, sizeof(buf), status, &error);
317             if (bytes_read > 0)
318                     comm->AppendBytesToCache (buf, bytes_read, true);
319         }
320 
321         switch (status)
322         {
323         case eConnectionStatusSuccess:
324             break;
325 
326         case eConnectionStatusNoConnection:     // No connection
327         case eConnectionStatusLostConnection:   // Lost connection while connected to a valid connection
328             done = true;
329             // Fall through...
330         default:
331         case eConnectionStatusError:            // Check GetError() for details
332         case eConnectionStatusTimedOut:         // Request timed out
333             error.LogIfError(log, "%p Communication::BytesAvailable () => status = %i", p, status);
334             break;
335         }
336     }
337     if (log)
338         log->Printf ("%p Communication::ReadThread () thread exiting...", p);
339 
340     // Let clients know that this thread is exiting
341     comm->m_read_thread = LLDB_INVALID_HOST_THREAD;
342     comm->BroadcastEvent (eBroadcastBitReadThreadDidExit);
343     return NULL;
344 }
345 
346 void
347 Communication::SetReadThreadBytesReceivedCallback
348 (
349     ReadThreadBytesReceived callback,
350     void *callback_baton
351 )
352 {
353     m_callback = callback;
354     m_callback_baton = callback_baton;
355 }
356 
357 void
358 Communication::SetConnection (Connection *connection)
359 {
360     StopReadThread(NULL);
361     Disconnect (NULL);
362     m_connection_ap.reset(connection);
363 }
364