1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===//
2 //
3 //                     The LLVM Compiler Infrastructure
4 //
5 // This file is distributed under the University of Illinois Open Source
6 // License. See LICENSE.TXT for details.
7 //
8 //===----------------------------------------------------------------------===//
9 
10 // C Includes
11 // C++ Includes
12 #include <cstring>
13 
14 // Other libraries and framework includes
15 // Project includes
16 #include "lldb/Core/Communication.h"
17 #include "lldb/Core/Connection.h"
18 #include "lldb/Core/Listener.h"
19 #include "lldb/Core/Log.h"
20 #include "lldb/Core/Timer.h"
21 #include "lldb/Core/Event.h"
22 #include "lldb/Host/Host.h"
23 #include "lldb/Host/HostThread.h"
24 #include "lldb/Host/ThreadLauncher.h"
25 
26 using namespace lldb;
27 using namespace lldb_private;
28 
29 ConstString &
30 Communication::GetStaticBroadcasterClass ()
31 {
32     static ConstString class_name ("lldb.communication");
33     return class_name;
34 }
35 
36 Communication::Communication(const char *name) :
37     Broadcaster(nullptr, name),
38     m_connection_sp(),
39     m_read_thread_enabled(false),
40     m_read_thread_did_exit(false),
41     m_bytes(),
42     m_bytes_mutex(Mutex::eMutexTypeRecursive),
43     m_write_mutex(Mutex::eMutexTypeNormal),
44     m_synchronize_mutex(Mutex::eMutexTypeNormal),
45     m_callback(nullptr),
46     m_callback_baton(nullptr),
47     m_close_on_eof(true)
48 
49 {
50     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
51                                  "%p Communication::Communication (name = %s)",
52                                  this, name);
53 
54     SetEventName (eBroadcastBitDisconnected, "disconnected");
55     SetEventName (eBroadcastBitReadThreadGotBytes, "got bytes");
56     SetEventName (eBroadcastBitReadThreadDidExit, "read thread did exit");
57     SetEventName (eBroadcastBitReadThreadShouldExit, "read thread should exit");
58     SetEventName (eBroadcastBitPacketAvailable, "packet available");
59     SetEventName (eBroadcastBitNoMorePendingInput, "no more pending input");
60 
61     CheckInWithManager();
62 }
63 
64 Communication::~Communication()
65 {
66     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
67                                  "%p Communication::~Communication (name = %s)",
68                                  this, GetBroadcasterName().AsCString());
69     Clear();
70 }
71 
72 void
73 Communication::Clear()
74 {
75     SetReadThreadBytesReceivedCallback(nullptr, nullptr);
76     Disconnect(nullptr);
77     StopReadThread(nullptr);
78 }
79 
80 ConnectionStatus
81 Communication::Connect (const char *url, Error *error_ptr)
82 {
83     Clear();
84 
85     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Connect (url = %s)", this, url);
86 
87     lldb::ConnectionSP connection_sp (m_connection_sp);
88     if (connection_sp)
89         return connection_sp->Connect (url, error_ptr);
90     if (error_ptr)
91         error_ptr->SetErrorString("Invalid connection.");
92     return eConnectionStatusNoConnection;
93 }
94 
95 ConnectionStatus
96 Communication::Disconnect (Error *error_ptr)
97 {
98     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Disconnect ()", this);
99 
100     lldb::ConnectionSP connection_sp (m_connection_sp);
101     if (connection_sp)
102     {
103         ConnectionStatus status = connection_sp->Disconnect (error_ptr);
104         // We currently don't protect connection_sp with any mutex for
105         // multi-threaded environments. So lets not nuke our connection class
106         // without putting some multi-threaded protections in. We also probably
107         // don't want to pay for the overhead it might cause if every time we
108         // access the connection we have to take a lock.
109         //
110         // This unique pointer will cleanup after itself when this object goes away,
111         // so there is no need to currently have it destroy itself immediately
112         // upon disconnnect.
113         //connection_sp.reset();
114         return status;
115     }
116     return eConnectionStatusNoConnection;
117 }
118 
119 bool
120 Communication::IsConnected () const
121 {
122     lldb::ConnectionSP connection_sp(m_connection_sp);
123     return (connection_sp ? connection_sp->IsConnected() : false);
124 }
125 
126 bool
127 Communication::HasConnection () const
128 {
129     return m_connection_sp.get() != nullptr;
130 }
131 
132 size_t
133 Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr)
134 {
135     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
136                                          "%p Communication::Read (dst = %p, dst_len = %" PRIu64 ", timeout = %u usec) connection = %p",
137                                          this,
138                                          dst,
139                                          (uint64_t)dst_len,
140                                          timeout_usec,
141                                          m_connection_sp.get());
142 
143     if (m_read_thread_enabled)
144     {
145         // We have a dedicated read thread that is getting data for us
146         size_t cached_bytes = GetCachedBytes (dst, dst_len);
147         if (cached_bytes > 0 || timeout_usec == 0)
148         {
149             status = eConnectionStatusSuccess;
150             return cached_bytes;
151         }
152 
153         if (!m_connection_sp)
154         {
155             if (error_ptr)
156                 error_ptr->SetErrorString("Invalid connection.");
157             status = eConnectionStatusNoConnection;
158             return 0;
159         }
160         // Set the timeout appropriately
161         TimeValue timeout_time;
162         if (timeout_usec != UINT32_MAX)
163         {
164             timeout_time = TimeValue::Now();
165             timeout_time.OffsetWithMicroSeconds (timeout_usec);
166         }
167 
168         ListenerSP listener_sp(Listener::MakeListener("Communication::Read"));
169         listener_sp->StartListeningForEvents (this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
170         EventSP event_sp;
171         while (listener_sp->WaitForEvent (timeout_time.IsValid() ? &timeout_time : nullptr, event_sp))
172         {
173             const uint32_t event_type = event_sp->GetType();
174             if (event_type & eBroadcastBitReadThreadGotBytes)
175             {
176                 return GetCachedBytes (dst, dst_len);
177             }
178 
179             if (event_type & eBroadcastBitReadThreadDidExit)
180             {
181                 if (GetCloseOnEOF ())
182                     Disconnect(nullptr);
183                 break;
184             }
185         }
186         return 0;
187     }
188 
189     // We aren't using a read thread, just read the data synchronously in this
190     // thread.
191     lldb::ConnectionSP connection_sp (m_connection_sp);
192     if (connection_sp)
193     {
194         return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr);
195     }
196 
197     if (error_ptr)
198         error_ptr->SetErrorString("Invalid connection.");
199     status = eConnectionStatusNoConnection;
200     return 0;
201 }
202 
203 size_t
204 Communication::Write (const void *src, size_t src_len, ConnectionStatus &status, Error *error_ptr)
205 {
206     lldb::ConnectionSP connection_sp (m_connection_sp);
207 
208     Mutex::Locker locker(m_write_mutex);
209     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
210                                          "%p Communication::Write (src = %p, src_len = %" PRIu64 ") connection = %p",
211                                          this,
212                                          src,
213                                          (uint64_t)src_len,
214                                          connection_sp.get());
215 
216     if (connection_sp)
217         return connection_sp->Write (src, src_len, status, error_ptr);
218 
219     if (error_ptr)
220         error_ptr->SetErrorString("Invalid connection.");
221     status = eConnectionStatusNoConnection;
222     return 0;
223 }
224 
225 bool
226 Communication::StartReadThread (Error *error_ptr)
227 {
228     if (error_ptr)
229         error_ptr->Clear();
230 
231     if (m_read_thread.IsJoinable())
232         return true;
233 
234     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
235                                  "%p Communication::StartReadThread ()", this);
236 
237     char thread_name[1024];
238     snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", GetBroadcasterName().AsCString());
239 
240     m_read_thread_enabled = true;
241     m_read_thread_did_exit = false;
242     m_read_thread = ThreadLauncher::LaunchThread(thread_name, Communication::ReadThread, this, error_ptr);
243     if (!m_read_thread.IsJoinable())
244         m_read_thread_enabled = false;
245     return m_read_thread_enabled;
246 }
247 
248 bool
249 Communication::StopReadThread (Error *error_ptr)
250 {
251     if (!m_read_thread.IsJoinable())
252         return true;
253 
254     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
255                                  "%p Communication::StopReadThread ()", this);
256 
257     m_read_thread_enabled = false;
258 
259     BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
260 
261     // error = m_read_thread.Cancel();
262 
263     Error error = m_read_thread.Join(nullptr);
264     return error.Success();
265 }
266 
267 bool
268 Communication::JoinReadThread (Error *error_ptr)
269 {
270     if (!m_read_thread.IsJoinable())
271         return true;
272 
273     Error error = m_read_thread.Join(nullptr);
274     return error.Success();
275 }
276 
277 size_t
278 Communication::GetCachedBytes (void *dst, size_t dst_len)
279 {
280     Mutex::Locker locker(m_bytes_mutex);
281     if (!m_bytes.empty())
282     {
283         // If DST is nullptr and we have a thread, then return the number
284         // of bytes that are available so the caller can call again
285         if (dst == nullptr)
286             return m_bytes.size();
287 
288         const size_t len = std::min<size_t>(dst_len, m_bytes.size());
289 
290         ::memcpy (dst, m_bytes.c_str(), len);
291         m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
292 
293         return len;
294     }
295     return 0;
296 }
297 
298 void
299 Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, bool broadcast, ConnectionStatus status)
300 {
301     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
302                                  "%p Communication::AppendBytesToCache (src = %p, src_len = %" PRIu64 ", broadcast = %i)",
303                                  this, bytes, (uint64_t)len, broadcast);
304     if ((bytes == nullptr || len == 0)
305         && (status != lldb::eConnectionStatusEndOfFile))
306         return;
307     if (m_callback)
308     {
309         // If the user registered a callback, then call it and do not broadcast
310         m_callback (m_callback_baton, bytes, len);
311     }
312     else if (bytes != nullptr && len > 0)
313     {
314         Mutex::Locker locker(m_bytes_mutex);
315         m_bytes.append ((const char *)bytes, len);
316         if (broadcast)
317             BroadcastEventIfUnique (eBroadcastBitReadThreadGotBytes);
318     }
319 }
320 
321 size_t
322 Communication::ReadFromConnection (void *dst,
323                                    size_t dst_len,
324                                    uint32_t timeout_usec,
325                                    ConnectionStatus &status,
326                                    Error *error_ptr)
327 {
328     lldb::ConnectionSP connection_sp(m_connection_sp);
329     return (connection_sp ? connection_sp->Read(dst, dst_len, timeout_usec, status, error_ptr) : 0);
330 }
331 
332 bool
333 Communication::ReadThreadIsRunning ()
334 {
335     return m_read_thread_enabled;
336 }
337 
338 lldb::thread_result_t
339 Communication::ReadThread (lldb::thread_arg_t p)
340 {
341     Communication *comm = (Communication *)p;
342 
343     Log *log(lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION));
344 
345     if (log)
346         log->Printf ("%p Communication::ReadThread () thread starting...", p);
347 
348     uint8_t buf[1024];
349 
350     Error error;
351     ConnectionStatus status = eConnectionStatusSuccess;
352     bool done = false;
353     while (!done && comm->m_read_thread_enabled)
354     {
355         size_t bytes_read = comm->ReadFromConnection (buf, sizeof(buf), 5 * TimeValue::MicroSecPerSec, status, &error);
356         if (bytes_read > 0)
357             comm->AppendBytesToCache (buf, bytes_read, true, status);
358         else if ((bytes_read == 0)
359                 && status == eConnectionStatusEndOfFile)
360         {
361             if (comm->GetCloseOnEOF ())
362                 comm->Disconnect ();
363             comm->AppendBytesToCache (buf, bytes_read, true, status);
364         }
365 
366         switch (status)
367         {
368         case eConnectionStatusSuccess:
369             break;
370 
371         case eConnectionStatusEndOfFile:
372             done = true;
373             break;
374         case eConnectionStatusError:            // Check GetError() for details
375             if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO)
376             {
377                 // EIO on a pipe is usually caused by remote shutdown
378                 comm->Disconnect ();
379                 done = true;
380             }
381             if (log)
382                 error.LogIfError (log,
383                                   "%p Communication::ReadFromConnection () => status = %s",
384                                   p,
385                                   Communication::ConnectionStatusAsCString (status));
386             break;
387         case eConnectionStatusInterrupted:      // Synchronization signal from SynchronizeWithReadThread()
388             // The connection returns eConnectionStatusInterrupted only when there is no
389             // input pending to be read, so we can signal that.
390             comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
391             break;
392         case eConnectionStatusNoConnection:     // No connection
393         case eConnectionStatusLostConnection:   // Lost connection while connected to a valid connection
394             done = true;
395             LLVM_FALLTHROUGH;
396         case eConnectionStatusTimedOut:         // Request timed out
397             if (log)
398                 error.LogIfError (log,
399                                   "%p Communication::ReadFromConnection () => status = %s",
400                                   p,
401                                   Communication::ConnectionStatusAsCString (status));
402             break;
403         }
404     }
405     log = lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION);
406     if (log)
407         log->Printf ("%p Communication::ReadThread () thread exiting...", p);
408 
409     comm->m_read_thread_did_exit = true;
410     // Let clients know that this thread is exiting
411     comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
412     comm->BroadcastEvent (eBroadcastBitReadThreadDidExit);
413     return NULL;
414 }
415 
416 void
417 Communication::SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,
418                                                   void *callback_baton)
419 {
420     m_callback = callback;
421     m_callback_baton = callback_baton;
422 }
423 
424 void
425 Communication::SynchronizeWithReadThread ()
426 {
427     // Only one thread can do the synchronization dance at a time.
428     Mutex::Locker locker(m_synchronize_mutex);
429 
430     // First start listening for the synchronization event.
431     ListenerSP listener_sp(Listener::MakeListener("Communication::SyncronizeWithReadThread"));
432     listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
433 
434     // If the thread is not running, there is no point in synchronizing.
435     if (!m_read_thread_enabled || m_read_thread_did_exit)
436         return;
437 
438     // Notify the read thread.
439     m_connection_sp->InterruptRead();
440 
441     // Wait for the synchronization event.
442     EventSP event_sp;
443     listener_sp->WaitForEvent(nullptr, event_sp);
444 }
445 
446 void
447 Communication::SetConnection (Connection *connection)
448 {
449     Disconnect(nullptr);
450     StopReadThread(nullptr);
451     m_connection_sp.reset(connection);
452 }
453 
454 const char *
455 Communication::ConnectionStatusAsCString (lldb::ConnectionStatus status)
456 {
457     switch (status)
458     {
459     case eConnectionStatusSuccess:        return "success";
460     case eConnectionStatusError:          return "error";
461     case eConnectionStatusTimedOut:       return "timed out";
462     case eConnectionStatusNoConnection:   return "no connection";
463     case eConnectionStatusLostConnection: return "lost connection";
464     case eConnectionStatusEndOfFile:      return "end of file";
465     case eConnectionStatusInterrupted:    return "interrupted";
466     }
467 
468     static char unknown_state_string[64];
469     snprintf(unknown_state_string, sizeof (unknown_state_string), "ConnectionStatus = %i", status);
470     return unknown_state_string;
471 }
472