1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===//
2 //
3 //                     The LLVM Compiler Infrastructure
4 //
5 // This file is distributed under the University of Illinois Open Source
6 // License. See LICENSE.TXT for details.
7 //
8 //===----------------------------------------------------------------------===//
9 
10 // C Includes
11 // C++ Includes
12 // Other libraries and framework includes
13 // Project includes
14 #include "lldb/Core/Communication.h"
15 #include "lldb/Core/Connection.h"
16 #include "lldb/Core/Log.h"
17 #include "lldb/Core/Timer.h"
18 #include "lldb/Core/Event.h"
19 #include "lldb/Host/Host.h"
20 #include "lldb/Host/HostThread.h"
21 #include "lldb/Host/ThreadLauncher.h"
22 #include <string.h>
23 
24 using namespace lldb;
25 using namespace lldb_private;
26 
27 ConstString &
28 Communication::GetStaticBroadcasterClass ()
29 {
30     static ConstString class_name ("lldb.communication");
31     return class_name;
32 }
33 
34 //----------------------------------------------------------------------
35 // Constructor
36 //----------------------------------------------------------------------
37 Communication::Communication(const char *name) :
38     Broadcaster (NULL, name),
39     m_connection_sp (),
40     m_read_thread_enabled (false),
41     m_read_thread_did_exit (false),
42     m_bytes(),
43     m_bytes_mutex (Mutex::eMutexTypeRecursive),
44     m_write_mutex (Mutex::eMutexTypeNormal),
45     m_synchronize_mutex (Mutex::eMutexTypeNormal),
46     m_callback (NULL),
47     m_callback_baton (NULL),
48     m_close_on_eof (true)
49 
50 {
51     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
52                                  "%p Communication::Communication (name = %s)",
53                                  this, name);
54 
55     SetEventName (eBroadcastBitDisconnected, "disconnected");
56     SetEventName (eBroadcastBitReadThreadGotBytes, "got bytes");
57     SetEventName (eBroadcastBitReadThreadDidExit, "read thread did exit");
58     SetEventName (eBroadcastBitReadThreadShouldExit, "read thread should exit");
59     SetEventName (eBroadcastBitPacketAvailable, "packet available");
60     SetEventName (eBroadcastBitNoMorePendingInput, "no more pending input");
61 
62     CheckInWithManager();
63 }
64 
65 //----------------------------------------------------------------------
66 // Destructor
67 //----------------------------------------------------------------------
68 Communication::~Communication()
69 {
70     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
71                                  "%p Communication::~Communication (name = %s)",
72                                  this, m_broadcaster_name.AsCString(""));
73     Clear();
74 }
75 
76 void
77 Communication::Clear()
78 {
79     SetReadThreadBytesReceivedCallback (NULL, NULL);
80     Disconnect (NULL);
81     StopReadThread (NULL);
82 }
83 
84 ConnectionStatus
85 Communication::Connect (const char *url, Error *error_ptr)
86 {
87     Clear();
88 
89     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Connect (url = %s)", this, url);
90 
91     lldb::ConnectionSP connection_sp (m_connection_sp);
92     if (connection_sp.get())
93         return connection_sp->Connect (url, error_ptr);
94     if (error_ptr)
95         error_ptr->SetErrorString("Invalid connection.");
96     return eConnectionStatusNoConnection;
97 }
98 
99 ConnectionStatus
100 Communication::Disconnect (Error *error_ptr)
101 {
102     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Disconnect ()", this);
103 
104     lldb::ConnectionSP connection_sp (m_connection_sp);
105     if (connection_sp.get())
106     {
107         ConnectionStatus status = connection_sp->Disconnect (error_ptr);
108         // We currently don't protect connection_sp with any mutex for
109         // multi-threaded environments. So lets not nuke our connection class
110         // without putting some multi-threaded protections in. We also probably
111         // don't want to pay for the overhead it might cause if every time we
112         // access the connection we have to take a lock.
113         //
114         // This unique pointer will cleanup after itself when this object goes away,
115         // so there is no need to currently have it destroy itself immediately
116         // upon disconnnect.
117         //connection_sp.reset();
118         return status;
119     }
120     return eConnectionStatusNoConnection;
121 }
122 
123 bool
124 Communication::IsConnected () const
125 {
126     lldb::ConnectionSP connection_sp (m_connection_sp);
127     if (connection_sp.get())
128         return connection_sp->IsConnected ();
129     return false;
130 }
131 
132 bool
133 Communication::HasConnection () const
134 {
135     return m_connection_sp.get() != NULL;
136 }
137 
138 size_t
139 Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr)
140 {
141     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
142                                          "%p Communication::Read (dst = %p, dst_len = %" PRIu64 ", timeout = %u usec) connection = %p",
143                                          this,
144                                          dst,
145                                          (uint64_t)dst_len,
146                                          timeout_usec,
147                                          m_connection_sp.get());
148 
149     if (m_read_thread_enabled)
150     {
151         // We have a dedicated read thread that is getting data for us
152         size_t cached_bytes = GetCachedBytes (dst, dst_len);
153         if (cached_bytes > 0 || timeout_usec == 0)
154         {
155             status = eConnectionStatusSuccess;
156             return cached_bytes;
157         }
158 
159         if (m_connection_sp.get() == NULL)
160         {
161             if (error_ptr)
162                 error_ptr->SetErrorString("Invalid connection.");
163             status = eConnectionStatusNoConnection;
164             return 0;
165         }
166         // Set the timeout appropriately
167         TimeValue timeout_time;
168         if (timeout_usec != UINT32_MAX)
169         {
170             timeout_time = TimeValue::Now();
171             timeout_time.OffsetWithMicroSeconds (timeout_usec);
172         }
173 
174         Listener listener ("Communication::Read");
175         listener.StartListeningForEvents (this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
176         EventSP event_sp;
177         while (listener.WaitForEvent (timeout_time.IsValid() ? &timeout_time : NULL, event_sp))
178         {
179             const uint32_t event_type = event_sp->GetType();
180             if (event_type & eBroadcastBitReadThreadGotBytes)
181             {
182                 return GetCachedBytes (dst, dst_len);
183             }
184 
185             if (event_type & eBroadcastBitReadThreadDidExit)
186             {
187                 Disconnect (NULL);
188                 break;
189             }
190         }
191         return 0;
192     }
193 
194     // We aren't using a read thread, just read the data synchronously in this
195     // thread.
196     lldb::ConnectionSP connection_sp (m_connection_sp);
197     if (connection_sp.get())
198     {
199         return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr);
200     }
201 
202     if (error_ptr)
203         error_ptr->SetErrorString("Invalid connection.");
204     status = eConnectionStatusNoConnection;
205     return 0;
206 }
207 
208 
209 size_t
210 Communication::Write (const void *src, size_t src_len, ConnectionStatus &status, Error *error_ptr)
211 {
212     lldb::ConnectionSP connection_sp (m_connection_sp);
213 
214     Mutex::Locker locker(m_write_mutex);
215     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
216                                          "%p Communication::Write (src = %p, src_len = %" PRIu64 ") connection = %p",
217                                          this,
218                                          src,
219                                          (uint64_t)src_len,
220                                          connection_sp.get());
221 
222     if (connection_sp.get())
223         return connection_sp->Write (src, src_len, status, error_ptr);
224 
225     if (error_ptr)
226         error_ptr->SetErrorString("Invalid connection.");
227     status = eConnectionStatusNoConnection;
228     return 0;
229 }
230 
231 
232 bool
233 Communication::StartReadThread (Error *error_ptr)
234 {
235     if (error_ptr)
236         error_ptr->Clear();
237 
238     if (m_read_thread.IsJoinable())
239         return true;
240 
241     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
242                                  "%p Communication::StartReadThread ()", this);
243 
244 
245     char thread_name[1024];
246     snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", m_broadcaster_name.AsCString());
247 
248     m_read_thread_enabled = true;
249     m_read_thread_did_exit = false;
250     m_read_thread = ThreadLauncher::LaunchThread(thread_name, Communication::ReadThread, this, error_ptr);
251     if (!m_read_thread.IsJoinable())
252         m_read_thread_enabled = false;
253     return m_read_thread_enabled;
254 }
255 
256 bool
257 Communication::StopReadThread (Error *error_ptr)
258 {
259     if (!m_read_thread.IsJoinable())
260         return true;
261 
262     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
263                                  "%p Communication::StopReadThread ()", this);
264 
265     m_read_thread_enabled = false;
266 
267     BroadcastEvent (eBroadcastBitReadThreadShouldExit, NULL);
268 
269     // error = m_read_thread.Cancel();
270 
271     Error error = m_read_thread.Join(nullptr);
272     return error.Success();
273 }
274 
275 bool
276 Communication::JoinReadThread (Error *error_ptr)
277 {
278     if (!m_read_thread.IsJoinable())
279         return true;
280 
281     Error error = m_read_thread.Join(nullptr);
282     return error.Success();
283 }
284 
285 size_t
286 Communication::GetCachedBytes (void *dst, size_t dst_len)
287 {
288     Mutex::Locker locker(m_bytes_mutex);
289     if (m_bytes.size() > 0)
290     {
291         // If DST is NULL and we have a thread, then return the number
292         // of bytes that are available so the caller can call again
293         if (dst == NULL)
294             return m_bytes.size();
295 
296         const size_t len = std::min<size_t>(dst_len, m_bytes.size());
297 
298         ::memcpy (dst, m_bytes.c_str(), len);
299         m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
300 
301         return len;
302     }
303     return 0;
304 }
305 
306 void
307 Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, bool broadcast, ConnectionStatus status)
308 {
309     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
310                                  "%p Communication::AppendBytesToCache (src = %p, src_len = %" PRIu64 ", broadcast = %i)",
311                                  this, bytes, (uint64_t)len, broadcast);
312     if ((bytes == NULL || len == 0)
313         && (status != lldb::eConnectionStatusEndOfFile))
314         return;
315     if (m_callback)
316     {
317         // If the user registered a callback, then call it and do not broadcast
318         m_callback (m_callback_baton, bytes, len);
319     }
320     else if (bytes != NULL && len > 0)
321     {
322         Mutex::Locker locker(m_bytes_mutex);
323         m_bytes.append ((const char *)bytes, len);
324         if (broadcast)
325             BroadcastEventIfUnique (eBroadcastBitReadThreadGotBytes);
326     }
327 }
328 
329 size_t
330 Communication::ReadFromConnection (void *dst,
331                                    size_t dst_len,
332                                    uint32_t timeout_usec,
333                                    ConnectionStatus &status,
334                                    Error *error_ptr)
335 {
336     lldb::ConnectionSP connection_sp (m_connection_sp);
337     if (connection_sp.get())
338         return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr);
339     return 0;
340 }
341 
342 bool
343 Communication::ReadThreadIsRunning ()
344 {
345     return m_read_thread_enabled;
346 }
347 
348 lldb::thread_result_t
349 Communication::ReadThread (lldb::thread_arg_t p)
350 {
351     Communication *comm = (Communication *)p;
352 
353     Log *log(lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION));
354 
355     if (log)
356         log->Printf ("%p Communication::ReadThread () thread starting...", p);
357 
358     uint8_t buf[1024];
359 
360     Error error;
361     ConnectionStatus status = eConnectionStatusSuccess;
362     bool done = false;
363     while (!done && comm->m_read_thread_enabled)
364     {
365         size_t bytes_read = comm->ReadFromConnection (buf, sizeof(buf), 5 * TimeValue::MicroSecPerSec, status, &error);
366         if (bytes_read > 0)
367             comm->AppendBytesToCache (buf, bytes_read, true, status);
368         else if ((bytes_read == 0)
369                 && status == eConnectionStatusEndOfFile)
370         {
371             if (comm->GetCloseOnEOF ())
372                 comm->Disconnect ();
373             comm->AppendBytesToCache (buf, bytes_read, true, status);
374         }
375 
376         switch (status)
377         {
378         case eConnectionStatusSuccess:
379             break;
380 
381         case eConnectionStatusEndOfFile:
382             if (comm->GetCloseOnEOF())
383                  done = true;
384              break;
385         case eConnectionStatusError:            // Check GetError() for details
386             if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO)
387             {
388                 // EIO on a pipe is usually caused by remote shutdown
389                 comm->Disconnect ();
390                 done = true;
391             }
392             if (log)
393                 error.LogIfError (log,
394                                   "%p Communication::ReadFromConnection () => status = %s",
395                                   p,
396                                   Communication::ConnectionStatusAsCString (status));
397             break;
398         case eConnectionStatusInterrupted:      // Synchronization signal from SynchronizeWithReadThread()
399             // The connection returns eConnectionStatusInterrupted only when there is no
400             // input pending to be read, so we can signal that.
401             comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
402             break;
403         case eConnectionStatusNoConnection:     // No connection
404         case eConnectionStatusLostConnection:   // Lost connection while connected to a valid connection
405             done = true;
406             // Fall through...
407         case eConnectionStatusTimedOut:         // Request timed out
408             if (log)
409                 error.LogIfError (log,
410                                   "%p Communication::ReadFromConnection () => status = %s",
411                                   p,
412                                   Communication::ConnectionStatusAsCString (status));
413             break;
414         }
415     }
416     log = lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION);
417     if (log)
418         log->Printf ("%p Communication::ReadThread () thread exiting...", p);
419 
420     comm->m_read_thread_did_exit = true;
421     // Let clients know that this thread is exiting
422     comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
423     comm->BroadcastEvent (eBroadcastBitReadThreadDidExit);
424     return NULL;
425 }
426 
427 void
428 Communication::SetReadThreadBytesReceivedCallback
429 (
430     ReadThreadBytesReceived callback,
431     void *callback_baton
432 )
433 {
434     m_callback = callback;
435     m_callback_baton = callback_baton;
436 }
437 
438 void
439 Communication::SynchronizeWithReadThread ()
440 {
441     // Only one thread can do the synchronization dance at a time.
442     Mutex::Locker locker(m_synchronize_mutex);
443 
444     // First start listening for the synchronization event.
445     Listener listener("Communication::SyncronizeWithReadThread");
446     listener.StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
447 
448     // If the thread is not running, there is no point in synchronizing.
449     if (!m_read_thread_enabled || m_read_thread_did_exit)
450         return;
451 
452     // Notify the read thread.
453     m_connection_sp->InterruptRead();
454 
455     // Wait for the synchronization event.
456     EventSP event_sp;
457     listener.WaitForEvent(NULL, event_sp);
458 }
459 
460 void
461 Communication::SetConnection (Connection *connection)
462 {
463     Disconnect (NULL);
464     StopReadThread(NULL);
465     m_connection_sp.reset(connection);
466 }
467 
468 const char *
469 Communication::ConnectionStatusAsCString (lldb::ConnectionStatus status)
470 {
471     switch (status)
472     {
473     case eConnectionStatusSuccess:        return "success";
474     case eConnectionStatusError:          return "error";
475     case eConnectionStatusTimedOut:       return "timed out";
476     case eConnectionStatusNoConnection:   return "no connection";
477     case eConnectionStatusLostConnection: return "lost connection";
478     case eConnectionStatusEndOfFile:      return "end of file";
479     case eConnectionStatusInterrupted:    return "interrupted";
480     }
481 
482     static char unknown_state_string[64];
483     snprintf(unknown_state_string, sizeof (unknown_state_string), "ConnectionStatus = %i", status);
484     return unknown_state_string;
485 }
486