1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===// 2 // 3 // The LLVM Compiler Infrastructure 4 // 5 // This file is distributed under the University of Illinois Open Source 6 // License. See LICENSE.TXT for details. 7 // 8 //===----------------------------------------------------------------------===// 9 10 // C Includes 11 // C++ Includes 12 #include <cstring> 13 14 // Other libraries and framework includes 15 // Project includes 16 #include "lldb/Core/Communication.h" 17 #include "lldb/Core/Connection.h" 18 #include "lldb/Core/Listener.h" 19 #include "lldb/Core/Log.h" 20 #include "lldb/Core/Timer.h" 21 #include "lldb/Core/Event.h" 22 #include "lldb/Host/Host.h" 23 #include "lldb/Host/HostThread.h" 24 #include "lldb/Host/ThreadLauncher.h" 25 26 using namespace lldb; 27 using namespace lldb_private; 28 29 ConstString & 30 Communication::GetStaticBroadcasterClass () 31 { 32 static ConstString class_name ("lldb.communication"); 33 return class_name; 34 } 35 36 Communication::Communication(const char *name) 37 : Broadcaster(nullptr, name), 38 m_connection_sp(), 39 m_read_thread_enabled(false), 40 m_read_thread_did_exit(false), 41 m_bytes(), 42 m_bytes_mutex(), 43 m_write_mutex(), 44 m_synchronize_mutex(), 45 m_callback(nullptr), 46 m_callback_baton(nullptr), 47 m_close_on_eof(true) 48 49 { 50 lldb_private::LogIfAnyCategoriesSet(LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION, 51 "%p Communication::Communication (name = %s)", this, name); 52 53 SetEventName(eBroadcastBitDisconnected, "disconnected"); 54 SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes"); 55 SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit"); 56 SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit"); 57 SetEventName(eBroadcastBitPacketAvailable, "packet available"); 58 SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input"); 59 60 CheckInWithManager(); 61 } 62 63 Communication::~Communication() 64 { 65 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION, 66 "%p Communication::~Communication (name = %s)", 67 this, GetBroadcasterName().AsCString()); 68 Clear(); 69 } 70 71 void 72 Communication::Clear() 73 { 74 SetReadThreadBytesReceivedCallback(nullptr, nullptr); 75 Disconnect(nullptr); 76 StopReadThread(nullptr); 77 } 78 79 ConnectionStatus 80 Communication::Connect (const char *url, Error *error_ptr) 81 { 82 Clear(); 83 84 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Connect (url = %s)", this, url); 85 86 lldb::ConnectionSP connection_sp (m_connection_sp); 87 if (connection_sp) 88 return connection_sp->Connect (url, error_ptr); 89 if (error_ptr) 90 error_ptr->SetErrorString("Invalid connection."); 91 return eConnectionStatusNoConnection; 92 } 93 94 ConnectionStatus 95 Communication::Disconnect (Error *error_ptr) 96 { 97 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Disconnect ()", this); 98 99 lldb::ConnectionSP connection_sp (m_connection_sp); 100 if (connection_sp) 101 { 102 ConnectionStatus status = connection_sp->Disconnect (error_ptr); 103 // We currently don't protect connection_sp with any mutex for 104 // multi-threaded environments. So lets not nuke our connection class 105 // without putting some multi-threaded protections in. We also probably 106 // don't want to pay for the overhead it might cause if every time we 107 // access the connection we have to take a lock. 108 // 109 // This unique pointer will cleanup after itself when this object goes away, 110 // so there is no need to currently have it destroy itself immediately 111 // upon disconnnect. 112 //connection_sp.reset(); 113 return status; 114 } 115 return eConnectionStatusNoConnection; 116 } 117 118 bool 119 Communication::IsConnected () const 120 { 121 lldb::ConnectionSP connection_sp(m_connection_sp); 122 return (connection_sp ? connection_sp->IsConnected() : false); 123 } 124 125 bool 126 Communication::HasConnection () const 127 { 128 return m_connection_sp.get() != nullptr; 129 } 130 131 size_t 132 Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr) 133 { 134 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 135 "%p Communication::Read (dst = %p, dst_len = %" PRIu64 ", timeout = %u usec) connection = %p", 136 this, 137 dst, 138 (uint64_t)dst_len, 139 timeout_usec, 140 m_connection_sp.get()); 141 142 if (m_read_thread_enabled) 143 { 144 // We have a dedicated read thread that is getting data for us 145 size_t cached_bytes = GetCachedBytes (dst, dst_len); 146 if (cached_bytes > 0 || timeout_usec == 0) 147 { 148 status = eConnectionStatusSuccess; 149 return cached_bytes; 150 } 151 152 if (!m_connection_sp) 153 { 154 if (error_ptr) 155 error_ptr->SetErrorString("Invalid connection."); 156 status = eConnectionStatusNoConnection; 157 return 0; 158 } 159 // Set the timeout appropriately 160 TimeValue timeout_time; 161 if (timeout_usec != UINT32_MAX) 162 { 163 timeout_time = TimeValue::Now(); 164 timeout_time.OffsetWithMicroSeconds (timeout_usec); 165 } 166 167 ListenerSP listener_sp(Listener::MakeListener("Communication::Read")); 168 listener_sp->StartListeningForEvents (this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); 169 EventSP event_sp; 170 while (listener_sp->WaitForEvent (timeout_time.IsValid() ? &timeout_time : nullptr, event_sp)) 171 { 172 const uint32_t event_type = event_sp->GetType(); 173 if (event_type & eBroadcastBitReadThreadGotBytes) 174 { 175 return GetCachedBytes (dst, dst_len); 176 } 177 178 if (event_type & eBroadcastBitReadThreadDidExit) 179 { 180 if (GetCloseOnEOF ()) 181 Disconnect(nullptr); 182 break; 183 } 184 } 185 return 0; 186 } 187 188 // We aren't using a read thread, just read the data synchronously in this 189 // thread. 190 lldb::ConnectionSP connection_sp (m_connection_sp); 191 if (connection_sp) 192 { 193 return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr); 194 } 195 196 if (error_ptr) 197 error_ptr->SetErrorString("Invalid connection."); 198 status = eConnectionStatusNoConnection; 199 return 0; 200 } 201 202 size_t 203 Communication::Write (const void *src, size_t src_len, ConnectionStatus &status, Error *error_ptr) 204 { 205 lldb::ConnectionSP connection_sp (m_connection_sp); 206 207 std::lock_guard<std::mutex> guard(m_write_mutex); 208 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 209 "%p Communication::Write (src = %p, src_len = %" PRIu64 ") connection = %p", 210 this, 211 src, 212 (uint64_t)src_len, 213 connection_sp.get()); 214 215 if (connection_sp) 216 return connection_sp->Write (src, src_len, status, error_ptr); 217 218 if (error_ptr) 219 error_ptr->SetErrorString("Invalid connection."); 220 status = eConnectionStatusNoConnection; 221 return 0; 222 } 223 224 bool 225 Communication::StartReadThread (Error *error_ptr) 226 { 227 if (error_ptr) 228 error_ptr->Clear(); 229 230 if (m_read_thread.IsJoinable()) 231 return true; 232 233 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 234 "%p Communication::StartReadThread ()", this); 235 236 char thread_name[1024]; 237 snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", GetBroadcasterName().AsCString()); 238 239 m_read_thread_enabled = true; 240 m_read_thread_did_exit = false; 241 m_read_thread = ThreadLauncher::LaunchThread(thread_name, Communication::ReadThread, this, error_ptr); 242 if (!m_read_thread.IsJoinable()) 243 m_read_thread_enabled = false; 244 return m_read_thread_enabled; 245 } 246 247 bool 248 Communication::StopReadThread (Error *error_ptr) 249 { 250 if (!m_read_thread.IsJoinable()) 251 return true; 252 253 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 254 "%p Communication::StopReadThread ()", this); 255 256 m_read_thread_enabled = false; 257 258 BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); 259 260 // error = m_read_thread.Cancel(); 261 262 Error error = m_read_thread.Join(nullptr); 263 return error.Success(); 264 } 265 266 bool 267 Communication::JoinReadThread (Error *error_ptr) 268 { 269 if (!m_read_thread.IsJoinable()) 270 return true; 271 272 Error error = m_read_thread.Join(nullptr); 273 return error.Success(); 274 } 275 276 size_t 277 Communication::GetCachedBytes (void *dst, size_t dst_len) 278 { 279 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 280 if (!m_bytes.empty()) 281 { 282 // If DST is nullptr and we have a thread, then return the number 283 // of bytes that are available so the caller can call again 284 if (dst == nullptr) 285 return m_bytes.size(); 286 287 const size_t len = std::min<size_t>(dst_len, m_bytes.size()); 288 289 ::memcpy (dst, m_bytes.c_str(), len); 290 m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len); 291 292 return len; 293 } 294 return 0; 295 } 296 297 void 298 Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, bool broadcast, ConnectionStatus status) 299 { 300 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 301 "%p Communication::AppendBytesToCache (src = %p, src_len = %" PRIu64 ", broadcast = %i)", 302 this, bytes, (uint64_t)len, broadcast); 303 if ((bytes == nullptr || len == 0) 304 && (status != lldb::eConnectionStatusEndOfFile)) 305 return; 306 if (m_callback) 307 { 308 // If the user registered a callback, then call it and do not broadcast 309 m_callback (m_callback_baton, bytes, len); 310 } 311 else if (bytes != nullptr && len > 0) 312 { 313 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 314 m_bytes.append ((const char *)bytes, len); 315 if (broadcast) 316 BroadcastEventIfUnique (eBroadcastBitReadThreadGotBytes); 317 } 318 } 319 320 size_t 321 Communication::ReadFromConnection (void *dst, 322 size_t dst_len, 323 uint32_t timeout_usec, 324 ConnectionStatus &status, 325 Error *error_ptr) 326 { 327 lldb::ConnectionSP connection_sp(m_connection_sp); 328 return (connection_sp ? connection_sp->Read(dst, dst_len, timeout_usec, status, error_ptr) : 0); 329 } 330 331 bool 332 Communication::ReadThreadIsRunning () 333 { 334 return m_read_thread_enabled; 335 } 336 337 lldb::thread_result_t 338 Communication::ReadThread (lldb::thread_arg_t p) 339 { 340 Communication *comm = (Communication *)p; 341 342 Log *log(lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION)); 343 344 if (log) 345 log->Printf ("%p Communication::ReadThread () thread starting...", p); 346 347 uint8_t buf[1024]; 348 349 Error error; 350 ConnectionStatus status = eConnectionStatusSuccess; 351 bool done = false; 352 while (!done && comm->m_read_thread_enabled) 353 { 354 size_t bytes_read = comm->ReadFromConnection (buf, sizeof(buf), 5 * TimeValue::MicroSecPerSec, status, &error); 355 if (bytes_read > 0) 356 comm->AppendBytesToCache (buf, bytes_read, true, status); 357 else if ((bytes_read == 0) 358 && status == eConnectionStatusEndOfFile) 359 { 360 if (comm->GetCloseOnEOF ()) 361 comm->Disconnect (); 362 comm->AppendBytesToCache (buf, bytes_read, true, status); 363 } 364 365 switch (status) 366 { 367 case eConnectionStatusSuccess: 368 break; 369 370 case eConnectionStatusEndOfFile: 371 done = true; 372 break; 373 case eConnectionStatusError: // Check GetError() for details 374 if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) 375 { 376 // EIO on a pipe is usually caused by remote shutdown 377 comm->Disconnect (); 378 done = true; 379 } 380 if (log) 381 error.LogIfError (log, 382 "%p Communication::ReadFromConnection () => status = %s", 383 p, 384 Communication::ConnectionStatusAsCString (status)); 385 break; 386 case eConnectionStatusInterrupted: // Synchronization signal from SynchronizeWithReadThread() 387 // The connection returns eConnectionStatusInterrupted only when there is no 388 // input pending to be read, so we can signal that. 389 comm->BroadcastEvent (eBroadcastBitNoMorePendingInput); 390 break; 391 case eConnectionStatusNoConnection: // No connection 392 case eConnectionStatusLostConnection: // Lost connection while connected to a valid connection 393 done = true; 394 LLVM_FALLTHROUGH; 395 case eConnectionStatusTimedOut: // Request timed out 396 if (log) 397 error.LogIfError (log, 398 "%p Communication::ReadFromConnection () => status = %s", 399 p, 400 Communication::ConnectionStatusAsCString (status)); 401 break; 402 } 403 } 404 log = lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION); 405 if (log) 406 log->Printf ("%p Communication::ReadThread () thread exiting...", p); 407 408 comm->m_read_thread_did_exit = true; 409 // Let clients know that this thread is exiting 410 comm->BroadcastEvent (eBroadcastBitNoMorePendingInput); 411 comm->BroadcastEvent (eBroadcastBitReadThreadDidExit); 412 return NULL; 413 } 414 415 void 416 Communication::SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback, 417 void *callback_baton) 418 { 419 m_callback = callback; 420 m_callback_baton = callback_baton; 421 } 422 423 void 424 Communication::SynchronizeWithReadThread () 425 { 426 // Only one thread can do the synchronization dance at a time. 427 std::lock_guard<std::mutex> guard(m_synchronize_mutex); 428 429 // First start listening for the synchronization event. 430 ListenerSP listener_sp(Listener::MakeListener("Communication::SyncronizeWithReadThread")); 431 listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput); 432 433 // If the thread is not running, there is no point in synchronizing. 434 if (!m_read_thread_enabled || m_read_thread_did_exit) 435 return; 436 437 // Notify the read thread. 438 m_connection_sp->InterruptRead(); 439 440 // Wait for the synchronization event. 441 EventSP event_sp; 442 listener_sp->WaitForEvent(nullptr, event_sp); 443 } 444 445 void 446 Communication::SetConnection (Connection *connection) 447 { 448 Disconnect(nullptr); 449 StopReadThread(nullptr); 450 m_connection_sp.reset(connection); 451 } 452 453 const char * 454 Communication::ConnectionStatusAsCString (lldb::ConnectionStatus status) 455 { 456 switch (status) 457 { 458 case eConnectionStatusSuccess: return "success"; 459 case eConnectionStatusError: return "error"; 460 case eConnectionStatusTimedOut: return "timed out"; 461 case eConnectionStatusNoConnection: return "no connection"; 462 case eConnectionStatusLostConnection: return "lost connection"; 463 case eConnectionStatusEndOfFile: return "end of file"; 464 case eConnectionStatusInterrupted: return "interrupted"; 465 } 466 467 static char unknown_state_string[64]; 468 snprintf(unknown_state_string, sizeof (unknown_state_string), "ConnectionStatus = %i", status); 469 return unknown_state_string; 470 } 471