1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===//
2 //
3 //                     The LLVM Compiler Infrastructure
4 //
5 // This file is distributed under the University of Illinois Open Source
6 // License. See LICENSE.TXT for details.
7 //
8 //===----------------------------------------------------------------------===//
9 
10 // C Includes
11 // C++ Includes
12 #include <cstring>
13 
14 // Other libraries and framework includes
15 // Project includes
16 #include "lldb/Core/Communication.h"
17 #include "lldb/Core/Connection.h"
18 #include "lldb/Core/Listener.h"
19 #include "lldb/Core/Log.h"
20 #include "lldb/Core/Timer.h"
21 #include "lldb/Core/Event.h"
22 #include "lldb/Host/Host.h"
23 #include "lldb/Host/HostThread.h"
24 #include "lldb/Host/ThreadLauncher.h"
25 
26 using namespace lldb;
27 using namespace lldb_private;
28 
29 ConstString &
30 Communication::GetStaticBroadcasterClass ()
31 {
32     static ConstString class_name ("lldb.communication");
33     return class_name;
34 }
35 
36 Communication::Communication(const char *name)
37     : Broadcaster(nullptr, name),
38       m_connection_sp(),
39       m_read_thread_enabled(false),
40       m_read_thread_did_exit(false),
41       m_bytes(),
42       m_bytes_mutex(),
43       m_write_mutex(),
44       m_synchronize_mutex(),
45       m_callback(nullptr),
46       m_callback_baton(nullptr),
47       m_close_on_eof(true)
48 
49 {
50     lldb_private::LogIfAnyCategoriesSet(LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
51                                         "%p Communication::Communication (name = %s)", this, name);
52 
53     SetEventName(eBroadcastBitDisconnected, "disconnected");
54     SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
55     SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
56     SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
57     SetEventName(eBroadcastBitPacketAvailable, "packet available");
58     SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
59 
60     CheckInWithManager();
61 }
62 
63 Communication::~Communication()
64 {
65     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
66                                  "%p Communication::~Communication (name = %s)",
67                                  this, GetBroadcasterName().AsCString());
68     Clear();
69 }
70 
71 void
72 Communication::Clear()
73 {
74     SetReadThreadBytesReceivedCallback(nullptr, nullptr);
75     Disconnect(nullptr);
76     StopReadThread(nullptr);
77 }
78 
79 ConnectionStatus
80 Communication::Connect (const char *url, Error *error_ptr)
81 {
82     Clear();
83 
84     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Connect (url = %s)", this, url);
85 
86     lldb::ConnectionSP connection_sp (m_connection_sp);
87     if (connection_sp)
88         return connection_sp->Connect (url, error_ptr);
89     if (error_ptr)
90         error_ptr->SetErrorString("Invalid connection.");
91     return eConnectionStatusNoConnection;
92 }
93 
94 ConnectionStatus
95 Communication::Disconnect (Error *error_ptr)
96 {
97     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Disconnect ()", this);
98 
99     lldb::ConnectionSP connection_sp (m_connection_sp);
100     if (connection_sp)
101     {
102         ConnectionStatus status = connection_sp->Disconnect (error_ptr);
103         // We currently don't protect connection_sp with any mutex for
104         // multi-threaded environments. So lets not nuke our connection class
105         // without putting some multi-threaded protections in. We also probably
106         // don't want to pay for the overhead it might cause if every time we
107         // access the connection we have to take a lock.
108         //
109         // This unique pointer will cleanup after itself when this object goes away,
110         // so there is no need to currently have it destroy itself immediately
111         // upon disconnnect.
112         //connection_sp.reset();
113         return status;
114     }
115     return eConnectionStatusNoConnection;
116 }
117 
118 bool
119 Communication::IsConnected () const
120 {
121     lldb::ConnectionSP connection_sp(m_connection_sp);
122     return (connection_sp ? connection_sp->IsConnected() : false);
123 }
124 
125 bool
126 Communication::HasConnection () const
127 {
128     return m_connection_sp.get() != nullptr;
129 }
130 
131 size_t
132 Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr)
133 {
134     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
135                                          "%p Communication::Read (dst = %p, dst_len = %" PRIu64 ", timeout = %u usec) connection = %p",
136                                          this,
137                                          dst,
138                                          (uint64_t)dst_len,
139                                          timeout_usec,
140                                          m_connection_sp.get());
141 
142     if (m_read_thread_enabled)
143     {
144         // We have a dedicated read thread that is getting data for us
145         size_t cached_bytes = GetCachedBytes (dst, dst_len);
146         if (cached_bytes > 0 || timeout_usec == 0)
147         {
148             status = eConnectionStatusSuccess;
149             return cached_bytes;
150         }
151 
152         if (!m_connection_sp)
153         {
154             if (error_ptr)
155                 error_ptr->SetErrorString("Invalid connection.");
156             status = eConnectionStatusNoConnection;
157             return 0;
158         }
159         // Set the timeout appropriately
160         TimeValue timeout_time;
161         if (timeout_usec != UINT32_MAX)
162         {
163             timeout_time = TimeValue::Now();
164             timeout_time.OffsetWithMicroSeconds (timeout_usec);
165         }
166 
167         ListenerSP listener_sp(Listener::MakeListener("Communication::Read"));
168         listener_sp->StartListeningForEvents (this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
169         EventSP event_sp;
170         while (listener_sp->WaitForEvent (timeout_time.IsValid() ? &timeout_time : nullptr, event_sp))
171         {
172             const uint32_t event_type = event_sp->GetType();
173             if (event_type & eBroadcastBitReadThreadGotBytes)
174             {
175                 return GetCachedBytes (dst, dst_len);
176             }
177 
178             if (event_type & eBroadcastBitReadThreadDidExit)
179             {
180                 if (GetCloseOnEOF ())
181                     Disconnect(nullptr);
182                 break;
183             }
184         }
185         return 0;
186     }
187 
188     // We aren't using a read thread, just read the data synchronously in this
189     // thread.
190     lldb::ConnectionSP connection_sp (m_connection_sp);
191     if (connection_sp)
192     {
193         return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr);
194     }
195 
196     if (error_ptr)
197         error_ptr->SetErrorString("Invalid connection.");
198     status = eConnectionStatusNoConnection;
199     return 0;
200 }
201 
202 size_t
203 Communication::Write (const void *src, size_t src_len, ConnectionStatus &status, Error *error_ptr)
204 {
205     lldb::ConnectionSP connection_sp (m_connection_sp);
206 
207     std::lock_guard<std::mutex> guard(m_write_mutex);
208     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
209                                          "%p Communication::Write (src = %p, src_len = %" PRIu64 ") connection = %p",
210                                          this,
211                                          src,
212                                          (uint64_t)src_len,
213                                          connection_sp.get());
214 
215     if (connection_sp)
216         return connection_sp->Write (src, src_len, status, error_ptr);
217 
218     if (error_ptr)
219         error_ptr->SetErrorString("Invalid connection.");
220     status = eConnectionStatusNoConnection;
221     return 0;
222 }
223 
224 bool
225 Communication::StartReadThread (Error *error_ptr)
226 {
227     if (error_ptr)
228         error_ptr->Clear();
229 
230     if (m_read_thread.IsJoinable())
231         return true;
232 
233     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
234                                  "%p Communication::StartReadThread ()", this);
235 
236     char thread_name[1024];
237     snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", GetBroadcasterName().AsCString());
238 
239     m_read_thread_enabled = true;
240     m_read_thread_did_exit = false;
241     m_read_thread = ThreadLauncher::LaunchThread(thread_name, Communication::ReadThread, this, error_ptr);
242     if (!m_read_thread.IsJoinable())
243         m_read_thread_enabled = false;
244     return m_read_thread_enabled;
245 }
246 
247 bool
248 Communication::StopReadThread (Error *error_ptr)
249 {
250     if (!m_read_thread.IsJoinable())
251         return true;
252 
253     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
254                                  "%p Communication::StopReadThread ()", this);
255 
256     m_read_thread_enabled = false;
257 
258     BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
259 
260     // error = m_read_thread.Cancel();
261 
262     Error error = m_read_thread.Join(nullptr);
263     return error.Success();
264 }
265 
266 bool
267 Communication::JoinReadThread (Error *error_ptr)
268 {
269     if (!m_read_thread.IsJoinable())
270         return true;
271 
272     Error error = m_read_thread.Join(nullptr);
273     return error.Success();
274 }
275 
276 size_t
277 Communication::GetCachedBytes (void *dst, size_t dst_len)
278 {
279     std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
280     if (!m_bytes.empty())
281     {
282         // If DST is nullptr and we have a thread, then return the number
283         // of bytes that are available so the caller can call again
284         if (dst == nullptr)
285             return m_bytes.size();
286 
287         const size_t len = std::min<size_t>(dst_len, m_bytes.size());
288 
289         ::memcpy (dst, m_bytes.c_str(), len);
290         m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
291 
292         return len;
293     }
294     return 0;
295 }
296 
297 void
298 Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, bool broadcast, ConnectionStatus status)
299 {
300     lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
301                                  "%p Communication::AppendBytesToCache (src = %p, src_len = %" PRIu64 ", broadcast = %i)",
302                                  this, bytes, (uint64_t)len, broadcast);
303     if ((bytes == nullptr || len == 0)
304         && (status != lldb::eConnectionStatusEndOfFile))
305         return;
306     if (m_callback)
307     {
308         // If the user registered a callback, then call it and do not broadcast
309         m_callback (m_callback_baton, bytes, len);
310     }
311     else if (bytes != nullptr && len > 0)
312     {
313         std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
314         m_bytes.append ((const char *)bytes, len);
315         if (broadcast)
316             BroadcastEventIfUnique (eBroadcastBitReadThreadGotBytes);
317     }
318 }
319 
320 size_t
321 Communication::ReadFromConnection (void *dst,
322                                    size_t dst_len,
323                                    uint32_t timeout_usec,
324                                    ConnectionStatus &status,
325                                    Error *error_ptr)
326 {
327     lldb::ConnectionSP connection_sp(m_connection_sp);
328     return (connection_sp ? connection_sp->Read(dst, dst_len, timeout_usec, status, error_ptr) : 0);
329 }
330 
331 bool
332 Communication::ReadThreadIsRunning ()
333 {
334     return m_read_thread_enabled;
335 }
336 
337 lldb::thread_result_t
338 Communication::ReadThread (lldb::thread_arg_t p)
339 {
340     Communication *comm = (Communication *)p;
341 
342     Log *log(lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION));
343 
344     if (log)
345         log->Printf ("%p Communication::ReadThread () thread starting...", p);
346 
347     uint8_t buf[1024];
348 
349     Error error;
350     ConnectionStatus status = eConnectionStatusSuccess;
351     bool done = false;
352     while (!done && comm->m_read_thread_enabled)
353     {
354         size_t bytes_read = comm->ReadFromConnection (buf, sizeof(buf), 5 * TimeValue::MicroSecPerSec, status, &error);
355         if (bytes_read > 0)
356             comm->AppendBytesToCache (buf, bytes_read, true, status);
357         else if ((bytes_read == 0)
358                 && status == eConnectionStatusEndOfFile)
359         {
360             if (comm->GetCloseOnEOF ())
361                 comm->Disconnect ();
362             comm->AppendBytesToCache (buf, bytes_read, true, status);
363         }
364 
365         switch (status)
366         {
367         case eConnectionStatusSuccess:
368             break;
369 
370         case eConnectionStatusEndOfFile:
371             done = true;
372             break;
373         case eConnectionStatusError:            // Check GetError() for details
374             if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO)
375             {
376                 // EIO on a pipe is usually caused by remote shutdown
377                 comm->Disconnect ();
378                 done = true;
379             }
380             if (log)
381                 error.LogIfError (log,
382                                   "%p Communication::ReadFromConnection () => status = %s",
383                                   p,
384                                   Communication::ConnectionStatusAsCString (status));
385             break;
386         case eConnectionStatusInterrupted:      // Synchronization signal from SynchronizeWithReadThread()
387             // The connection returns eConnectionStatusInterrupted only when there is no
388             // input pending to be read, so we can signal that.
389             comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
390             break;
391         case eConnectionStatusNoConnection:     // No connection
392         case eConnectionStatusLostConnection:   // Lost connection while connected to a valid connection
393             done = true;
394             LLVM_FALLTHROUGH;
395         case eConnectionStatusTimedOut:         // Request timed out
396             if (log)
397                 error.LogIfError (log,
398                                   "%p Communication::ReadFromConnection () => status = %s",
399                                   p,
400                                   Communication::ConnectionStatusAsCString (status));
401             break;
402         }
403     }
404     log = lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION);
405     if (log)
406         log->Printf ("%p Communication::ReadThread () thread exiting...", p);
407 
408     comm->m_read_thread_did_exit = true;
409     // Let clients know that this thread is exiting
410     comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
411     comm->BroadcastEvent (eBroadcastBitReadThreadDidExit);
412     return NULL;
413 }
414 
415 void
416 Communication::SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,
417                                                   void *callback_baton)
418 {
419     m_callback = callback;
420     m_callback_baton = callback_baton;
421 }
422 
423 void
424 Communication::SynchronizeWithReadThread ()
425 {
426     // Only one thread can do the synchronization dance at a time.
427     std::lock_guard<std::mutex> guard(m_synchronize_mutex);
428 
429     // First start listening for the synchronization event.
430     ListenerSP listener_sp(Listener::MakeListener("Communication::SyncronizeWithReadThread"));
431     listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
432 
433     // If the thread is not running, there is no point in synchronizing.
434     if (!m_read_thread_enabled || m_read_thread_did_exit)
435         return;
436 
437     // Notify the read thread.
438     m_connection_sp->InterruptRead();
439 
440     // Wait for the synchronization event.
441     EventSP event_sp;
442     listener_sp->WaitForEvent(nullptr, event_sp);
443 }
444 
445 void
446 Communication::SetConnection (Connection *connection)
447 {
448     Disconnect(nullptr);
449     StopReadThread(nullptr);
450     m_connection_sp.reset(connection);
451 }
452 
453 const char *
454 Communication::ConnectionStatusAsCString (lldb::ConnectionStatus status)
455 {
456     switch (status)
457     {
458     case eConnectionStatusSuccess:        return "success";
459     case eConnectionStatusError:          return "error";
460     case eConnectionStatusTimedOut:       return "timed out";
461     case eConnectionStatusNoConnection:   return "no connection";
462     case eConnectionStatusLostConnection: return "lost connection";
463     case eConnectionStatusEndOfFile:      return "end of file";
464     case eConnectionStatusInterrupted:    return "interrupted";
465     }
466 
467     static char unknown_state_string[64];
468     snprintf(unknown_state_string, sizeof (unknown_state_string), "ConnectionStatus = %i", status);
469     return unknown_state_string;
470 }
471