1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===//
2 //
3 //                     The LLVM Compiler Infrastructure
4 //
5 // This file is distributed under the University of Illinois Open Source
6 // License. See LICENSE.TXT for details.
7 //
8 //===----------------------------------------------------------------------===//
9 
10 // C Includes
11 // C++ Includes
12 #include <cstring>
13 
14 // Other libraries and framework includes
15 // Project includes
16 #include "lldb/Core/Communication.h"
17 #include "lldb/Core/Connection.h"
18 #include "lldb/Core/Listener.h"
19 #include "lldb/Core/Log.h"
20 #include "lldb/Core/Timer.h"
21 #include "lldb/Core/Event.h"
22 #include "lldb/Host/Host.h"
23 #include "lldb/Host/HostThread.h"
24 #include "lldb/Host/ThreadLauncher.h"
25 
26 using namespace lldb;
27 using namespace lldb_private;
28 
29 ConstString &
30 Communication::GetStaticBroadcasterClass ()
31 {
32     static ConstString class_name ("lldb.communication");
33     return class_name;
34 }
35 
36 Communication::Communication(const char *name)
37     : Broadcaster(nullptr, name),
38       m_connection_sp(),
39       m_read_thread_enabled(false),
40       m_read_thread_did_exit(false),
41       m_bytes(),
42       m_bytes_mutex(),
43       m_write_mutex(),
44       m_synchronize_mutex(),
45       m_callback(nullptr),
46       m_callback_baton(nullptr),
47       m_close_on_eof(true)
48 
49 {
50     lldb_private::LogIfAnyCategoriesSet(LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
51                                         "%p Communication::Communication (name = %s)", this, name);
52 
53     SetEventName(eBroadcastBitDisconnected, "disconnected");
54     SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
55     SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
56     SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
57     SetEventName(eBroadcastBitPacketAvailable, "packet available");
58     SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
59 
60     CheckInWithManager();
61 }
62 
63 Communication::~Communication()
64 {
65     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
66                                  "%p Communication::~Communication (name = %s)",
67                                  this, GetBroadcasterName().AsCString());
68     Clear();
69 }
70 
71 void
72 Communication::Clear()
73 {
74     SetReadThreadBytesReceivedCallback(nullptr, nullptr);
75     Disconnect(nullptr);
76     StopReadThread(nullptr);
77 }
78 
79 ConnectionStatus
80 Communication::Connect (const char *url, Error *error_ptr)
81 {
82     Clear();
83 
84     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Connect (url = %s)", this, url);
85 
86     lldb::ConnectionSP connection_sp (m_connection_sp);
87     if (connection_sp)
88         return connection_sp->Connect (url, error_ptr);
89     if (error_ptr)
90         error_ptr->SetErrorString("Invalid connection.");
91     return eConnectionStatusNoConnection;
92 }
93 
94 ConnectionStatus
95 Communication::Disconnect (Error *error_ptr)
96 {
97     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Disconnect ()", this);
98 
99     lldb::ConnectionSP connection_sp (m_connection_sp);
100     if (connection_sp)
101     {
102         ConnectionStatus status = connection_sp->Disconnect (error_ptr);
103         // We currently don't protect connection_sp with any mutex for
104         // multi-threaded environments. So lets not nuke our connection class
105         // without putting some multi-threaded protections in. We also probably
106         // don't want to pay for the overhead it might cause if every time we
107         // access the connection we have to take a lock.
108         //
109         // This unique pointer will cleanup after itself when this object goes away,
110         // so there is no need to currently have it destroy itself immediately
111         // upon disconnnect.
112         //connection_sp.reset();
113         return status;
114     }
115     return eConnectionStatusNoConnection;
116 }
117 
118 bool
119 Communication::IsConnected () const
120 {
121     lldb::ConnectionSP connection_sp(m_connection_sp);
122     return (connection_sp ? connection_sp->IsConnected() : false);
123 }
124 
125 bool
126 Communication::HasConnection () const
127 {
128     return m_connection_sp.get() != nullptr;
129 }
130 
131 size_t
132 Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr)
133 {
134     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
135                                          "%p Communication::Read (dst = %p, dst_len = %" PRIu64 ", timeout = %u usec) connection = %p",
136                                          this,
137                                          dst,
138                                          (uint64_t)dst_len,
139                                          timeout_usec,
140                                          m_connection_sp.get());
141 
142     if (m_read_thread_enabled)
143     {
144         // We have a dedicated read thread that is getting data for us
145         size_t cached_bytes = GetCachedBytes (dst, dst_len);
146         if (cached_bytes > 0 || timeout_usec == 0)
147         {
148             status = eConnectionStatusSuccess;
149             return cached_bytes;
150         }
151 
152         if (!m_connection_sp)
153         {
154             if (error_ptr)
155                 error_ptr->SetErrorString("Invalid connection.");
156             status = eConnectionStatusNoConnection;
157             return 0;
158         }
159 
160         ListenerSP listener_sp(Listener::MakeListener("Communication::Read"));
161         listener_sp->StartListeningForEvents (this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
162         EventSP event_sp;
163         std::chrono::microseconds timeout = std::chrono::microseconds(0);
164         if (timeout_usec != UINT32_MAX)
165             timeout = std::chrono::microseconds(timeout_usec);
166         while (listener_sp->WaitForEvent(timeout, event_sp))
167         {
168             const uint32_t event_type = event_sp->GetType();
169             if (event_type & eBroadcastBitReadThreadGotBytes)
170             {
171                 return GetCachedBytes (dst, dst_len);
172             }
173 
174             if (event_type & eBroadcastBitReadThreadDidExit)
175             {
176                 if (GetCloseOnEOF ())
177                     Disconnect(nullptr);
178                 break;
179             }
180         }
181         return 0;
182     }
183 
184     // We aren't using a read thread, just read the data synchronously in this
185     // thread.
186     lldb::ConnectionSP connection_sp (m_connection_sp);
187     if (connection_sp)
188     {
189         return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr);
190     }
191 
192     if (error_ptr)
193         error_ptr->SetErrorString("Invalid connection.");
194     status = eConnectionStatusNoConnection;
195     return 0;
196 }
197 
198 size_t
199 Communication::Write (const void *src, size_t src_len, ConnectionStatus &status, Error *error_ptr)
200 {
201     lldb::ConnectionSP connection_sp (m_connection_sp);
202 
203     std::lock_guard<std::mutex> guard(m_write_mutex);
204     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
205                                          "%p Communication::Write (src = %p, src_len = %" PRIu64 ") connection = %p",
206                                          this,
207                                          src,
208                                          (uint64_t)src_len,
209                                          connection_sp.get());
210 
211     if (connection_sp)
212         return connection_sp->Write (src, src_len, status, error_ptr);
213 
214     if (error_ptr)
215         error_ptr->SetErrorString("Invalid connection.");
216     status = eConnectionStatusNoConnection;
217     return 0;
218 }
219 
220 bool
221 Communication::StartReadThread (Error *error_ptr)
222 {
223     if (error_ptr)
224         error_ptr->Clear();
225 
226     if (m_read_thread.IsJoinable())
227         return true;
228 
229     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
230                                  "%p Communication::StartReadThread ()", this);
231 
232     char thread_name[1024];
233     snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", GetBroadcasterName().AsCString());
234 
235     m_read_thread_enabled = true;
236     m_read_thread_did_exit = false;
237     m_read_thread = ThreadLauncher::LaunchThread(thread_name, Communication::ReadThread, this, error_ptr);
238     if (!m_read_thread.IsJoinable())
239         m_read_thread_enabled = false;
240     return m_read_thread_enabled;
241 }
242 
243 bool
244 Communication::StopReadThread (Error *error_ptr)
245 {
246     if (!m_read_thread.IsJoinable())
247         return true;
248 
249     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
250                                  "%p Communication::StopReadThread ()", this);
251 
252     m_read_thread_enabled = false;
253 
254     BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
255 
256     // error = m_read_thread.Cancel();
257 
258     Error error = m_read_thread.Join(nullptr);
259     return error.Success();
260 }
261 
262 bool
263 Communication::JoinReadThread (Error *error_ptr)
264 {
265     if (!m_read_thread.IsJoinable())
266         return true;
267 
268     Error error = m_read_thread.Join(nullptr);
269     return error.Success();
270 }
271 
272 size_t
273 Communication::GetCachedBytes (void *dst, size_t dst_len)
274 {
275     std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
276     if (!m_bytes.empty())
277     {
278         // If DST is nullptr and we have a thread, then return the number
279         // of bytes that are available so the caller can call again
280         if (dst == nullptr)
281             return m_bytes.size();
282 
283         const size_t len = std::min<size_t>(dst_len, m_bytes.size());
284 
285         ::memcpy (dst, m_bytes.c_str(), len);
286         m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
287 
288         return len;
289     }
290     return 0;
291 }
292 
293 void
294 Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, bool broadcast, ConnectionStatus status)
295 {
296     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
297                                  "%p Communication::AppendBytesToCache (src = %p, src_len = %" PRIu64 ", broadcast = %i)",
298                                  this, bytes, (uint64_t)len, broadcast);
299     if ((bytes == nullptr || len == 0)
300         && (status != lldb::eConnectionStatusEndOfFile))
301         return;
302     if (m_callback)
303     {
304         // If the user registered a callback, then call it and do not broadcast
305         m_callback (m_callback_baton, bytes, len);
306     }
307     else if (bytes != nullptr && len > 0)
308     {
309         std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
310         m_bytes.append ((const char *)bytes, len);
311         if (broadcast)
312             BroadcastEventIfUnique (eBroadcastBitReadThreadGotBytes);
313     }
314 }
315 
316 size_t
317 Communication::ReadFromConnection (void *dst,
318                                    size_t dst_len,
319                                    uint32_t timeout_usec,
320                                    ConnectionStatus &status,
321                                    Error *error_ptr)
322 {
323     lldb::ConnectionSP connection_sp(m_connection_sp);
324     return (connection_sp ? connection_sp->Read(dst, dst_len, timeout_usec, status, error_ptr) : 0);
325 }
326 
327 bool
328 Communication::ReadThreadIsRunning ()
329 {
330     return m_read_thread_enabled;
331 }
332 
333 lldb::thread_result_t
334 Communication::ReadThread (lldb::thread_arg_t p)
335 {
336     Communication *comm = (Communication *)p;
337 
338     Log *log(lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION));
339 
340     if (log)
341         log->Printf ("%p Communication::ReadThread () thread starting...", p);
342 
343     uint8_t buf[1024];
344 
345     Error error;
346     ConnectionStatus status = eConnectionStatusSuccess;
347     bool done = false;
348     while (!done && comm->m_read_thread_enabled)
349     {
350         size_t bytes_read = comm->ReadFromConnection (buf, sizeof(buf), 5 * TimeValue::MicroSecPerSec, status, &error);
351         if (bytes_read > 0)
352             comm->AppendBytesToCache (buf, bytes_read, true, status);
353         else if ((bytes_read == 0)
354                 && status == eConnectionStatusEndOfFile)
355         {
356             if (comm->GetCloseOnEOF ())
357                 comm->Disconnect ();
358             comm->AppendBytesToCache (buf, bytes_read, true, status);
359         }
360 
361         switch (status)
362         {
363         case eConnectionStatusSuccess:
364             break;
365 
366         case eConnectionStatusEndOfFile:
367             done = true;
368             break;
369         case eConnectionStatusError:            // Check GetError() for details
370             if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO)
371             {
372                 // EIO on a pipe is usually caused by remote shutdown
373                 comm->Disconnect ();
374                 done = true;
375             }
376             if (log)
377                 error.LogIfError (log,
378                                   "%p Communication::ReadFromConnection () => status = %s",
379                                   p,
380                                   Communication::ConnectionStatusAsCString (status));
381             break;
382         case eConnectionStatusInterrupted:      // Synchronization signal from SynchronizeWithReadThread()
383             // The connection returns eConnectionStatusInterrupted only when there is no
384             // input pending to be read, so we can signal that.
385             comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
386             break;
387         case eConnectionStatusNoConnection:     // No connection
388         case eConnectionStatusLostConnection:   // Lost connection while connected to a valid connection
389             done = true;
390             LLVM_FALLTHROUGH;
391         case eConnectionStatusTimedOut:         // Request timed out
392             if (log)
393                 error.LogIfError (log,
394                                   "%p Communication::ReadFromConnection () => status = %s",
395                                   p,
396                                   Communication::ConnectionStatusAsCString (status));
397             break;
398         }
399     }
400     log = lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION);
401     if (log)
402         log->Printf ("%p Communication::ReadThread () thread exiting...", p);
403 
404     comm->m_read_thread_did_exit = true;
405     // Let clients know that this thread is exiting
406     comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
407     comm->BroadcastEvent (eBroadcastBitReadThreadDidExit);
408     return NULL;
409 }
410 
411 void
412 Communication::SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,
413                                                   void *callback_baton)
414 {
415     m_callback = callback;
416     m_callback_baton = callback_baton;
417 }
418 
419 void
420 Communication::SynchronizeWithReadThread ()
421 {
422     // Only one thread can do the synchronization dance at a time.
423     std::lock_guard<std::mutex> guard(m_synchronize_mutex);
424 
425     // First start listening for the synchronization event.
426     ListenerSP listener_sp(Listener::MakeListener("Communication::SyncronizeWithReadThread"));
427     listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
428 
429     // If the thread is not running, there is no point in synchronizing.
430     if (!m_read_thread_enabled || m_read_thread_did_exit)
431         return;
432 
433     // Notify the read thread.
434     m_connection_sp->InterruptRead();
435 
436     // Wait for the synchronization event.
437     EventSP event_sp;
438     listener_sp->WaitForEvent(std::chrono::microseconds(0), event_sp);
439 }
440 
441 void
442 Communication::SetConnection (Connection *connection)
443 {
444     Disconnect(nullptr);
445     StopReadThread(nullptr);
446     m_connection_sp.reset(connection);
447 }
448 
449 const char *
450 Communication::ConnectionStatusAsCString (lldb::ConnectionStatus status)
451 {
452     switch (status)
453     {
454     case eConnectionStatusSuccess:        return "success";
455     case eConnectionStatusError:          return "error";
456     case eConnectionStatusTimedOut:       return "timed out";
457     case eConnectionStatusNoConnection:   return "no connection";
458     case eConnectionStatusLostConnection: return "lost connection";
459     case eConnectionStatusEndOfFile:      return "end of file";
460     case eConnectionStatusInterrupted:    return "interrupted";
461     }
462 
463     static char unknown_state_string[64];
464     snprintf(unknown_state_string, sizeof (unknown_state_string), "ConnectionStatus = %i", status);
465     return unknown_state_string;
466 }
467