1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===// 2 // 3 // The LLVM Compiler Infrastructure 4 // 5 // This file is distributed under the University of Illinois Open Source 6 // License. See LICENSE.TXT for details. 7 // 8 //===----------------------------------------------------------------------===// 9 10 // C Includes 11 // C++ Includes 12 #include <cstring> 13 14 // Other libraries and framework includes 15 // Project includes 16 #include "lldb/Core/Communication.h" 17 #include "lldb/Core/Connection.h" 18 #include "lldb/Core/Listener.h" 19 #include "lldb/Core/Log.h" 20 #include "lldb/Core/Timer.h" 21 #include "lldb/Core/Event.h" 22 #include "lldb/Host/Host.h" 23 #include "lldb/Host/HostThread.h" 24 #include "lldb/Host/ThreadLauncher.h" 25 26 using namespace lldb; 27 using namespace lldb_private; 28 29 ConstString & 30 Communication::GetStaticBroadcasterClass () 31 { 32 static ConstString class_name ("lldb.communication"); 33 return class_name; 34 } 35 36 Communication::Communication(const char *name) 37 : Broadcaster(nullptr, name), 38 m_connection_sp(), 39 m_read_thread_enabled(false), 40 m_read_thread_did_exit(false), 41 m_bytes(), 42 m_bytes_mutex(), 43 m_write_mutex(), 44 m_synchronize_mutex(), 45 m_callback(nullptr), 46 m_callback_baton(nullptr), 47 m_close_on_eof(true) 48 49 { 50 lldb_private::LogIfAnyCategoriesSet(LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION, 51 "%p Communication::Communication (name = %s)", this, name); 52 53 SetEventName(eBroadcastBitDisconnected, "disconnected"); 54 SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes"); 55 SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit"); 56 SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit"); 57 SetEventName(eBroadcastBitPacketAvailable, "packet available"); 58 SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input"); 59 60 CheckInWithManager(); 61 } 62 63 Communication::~Communication() 64 { 65 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION, 66 "%p Communication::~Communication (name = %s)", 67 this, GetBroadcasterName().AsCString()); 68 Clear(); 69 } 70 71 void 72 Communication::Clear() 73 { 74 SetReadThreadBytesReceivedCallback(nullptr, nullptr); 75 Disconnect(nullptr); 76 StopReadThread(nullptr); 77 } 78 79 ConnectionStatus 80 Communication::Connect (const char *url, Error *error_ptr) 81 { 82 Clear(); 83 84 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Connect (url = %s)", this, url); 85 86 lldb::ConnectionSP connection_sp (m_connection_sp); 87 if (connection_sp) 88 return connection_sp->Connect (url, error_ptr); 89 if (error_ptr) 90 error_ptr->SetErrorString("Invalid connection."); 91 return eConnectionStatusNoConnection; 92 } 93 94 ConnectionStatus 95 Communication::Disconnect (Error *error_ptr) 96 { 97 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Disconnect ()", this); 98 99 lldb::ConnectionSP connection_sp (m_connection_sp); 100 if (connection_sp) 101 { 102 ConnectionStatus status = connection_sp->Disconnect (error_ptr); 103 // We currently don't protect connection_sp with any mutex for 104 // multi-threaded environments. So lets not nuke our connection class 105 // without putting some multi-threaded protections in. We also probably 106 // don't want to pay for the overhead it might cause if every time we 107 // access the connection we have to take a lock. 108 // 109 // This unique pointer will cleanup after itself when this object goes away, 110 // so there is no need to currently have it destroy itself immediately 111 // upon disconnnect. 112 //connection_sp.reset(); 113 return status; 114 } 115 return eConnectionStatusNoConnection; 116 } 117 118 bool 119 Communication::IsConnected () const 120 { 121 lldb::ConnectionSP connection_sp(m_connection_sp); 122 return (connection_sp ? connection_sp->IsConnected() : false); 123 } 124 125 bool 126 Communication::HasConnection () const 127 { 128 return m_connection_sp.get() != nullptr; 129 } 130 131 size_t 132 Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr) 133 { 134 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 135 "%p Communication::Read (dst = %p, dst_len = %" PRIu64 ", timeout = %u usec) connection = %p", 136 this, 137 dst, 138 (uint64_t)dst_len, 139 timeout_usec, 140 m_connection_sp.get()); 141 142 if (m_read_thread_enabled) 143 { 144 // We have a dedicated read thread that is getting data for us 145 size_t cached_bytes = GetCachedBytes (dst, dst_len); 146 if (cached_bytes > 0 || timeout_usec == 0) 147 { 148 status = eConnectionStatusSuccess; 149 return cached_bytes; 150 } 151 152 if (!m_connection_sp) 153 { 154 if (error_ptr) 155 error_ptr->SetErrorString("Invalid connection."); 156 status = eConnectionStatusNoConnection; 157 return 0; 158 } 159 160 ListenerSP listener_sp(Listener::MakeListener("Communication::Read")); 161 listener_sp->StartListeningForEvents (this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); 162 EventSP event_sp; 163 std::chrono::microseconds timeout = std::chrono::microseconds(0); 164 if (timeout_usec != UINT32_MAX) 165 timeout = std::chrono::microseconds(timeout_usec); 166 while (listener_sp->WaitForEvent(timeout, event_sp)) 167 { 168 const uint32_t event_type = event_sp->GetType(); 169 if (event_type & eBroadcastBitReadThreadGotBytes) 170 { 171 return GetCachedBytes (dst, dst_len); 172 } 173 174 if (event_type & eBroadcastBitReadThreadDidExit) 175 { 176 if (GetCloseOnEOF ()) 177 Disconnect(nullptr); 178 break; 179 } 180 } 181 return 0; 182 } 183 184 // We aren't using a read thread, just read the data synchronously in this 185 // thread. 186 lldb::ConnectionSP connection_sp (m_connection_sp); 187 if (connection_sp) 188 { 189 return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr); 190 } 191 192 if (error_ptr) 193 error_ptr->SetErrorString("Invalid connection."); 194 status = eConnectionStatusNoConnection; 195 return 0; 196 } 197 198 size_t 199 Communication::Write (const void *src, size_t src_len, ConnectionStatus &status, Error *error_ptr) 200 { 201 lldb::ConnectionSP connection_sp (m_connection_sp); 202 203 std::lock_guard<std::mutex> guard(m_write_mutex); 204 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 205 "%p Communication::Write (src = %p, src_len = %" PRIu64 ") connection = %p", 206 this, 207 src, 208 (uint64_t)src_len, 209 connection_sp.get()); 210 211 if (connection_sp) 212 return connection_sp->Write (src, src_len, status, error_ptr); 213 214 if (error_ptr) 215 error_ptr->SetErrorString("Invalid connection."); 216 status = eConnectionStatusNoConnection; 217 return 0; 218 } 219 220 bool 221 Communication::StartReadThread (Error *error_ptr) 222 { 223 if (error_ptr) 224 error_ptr->Clear(); 225 226 if (m_read_thread.IsJoinable()) 227 return true; 228 229 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 230 "%p Communication::StartReadThread ()", this); 231 232 char thread_name[1024]; 233 snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", GetBroadcasterName().AsCString()); 234 235 m_read_thread_enabled = true; 236 m_read_thread_did_exit = false; 237 m_read_thread = ThreadLauncher::LaunchThread(thread_name, Communication::ReadThread, this, error_ptr); 238 if (!m_read_thread.IsJoinable()) 239 m_read_thread_enabled = false; 240 return m_read_thread_enabled; 241 } 242 243 bool 244 Communication::StopReadThread (Error *error_ptr) 245 { 246 if (!m_read_thread.IsJoinable()) 247 return true; 248 249 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 250 "%p Communication::StopReadThread ()", this); 251 252 m_read_thread_enabled = false; 253 254 BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); 255 256 // error = m_read_thread.Cancel(); 257 258 Error error = m_read_thread.Join(nullptr); 259 return error.Success(); 260 } 261 262 bool 263 Communication::JoinReadThread (Error *error_ptr) 264 { 265 if (!m_read_thread.IsJoinable()) 266 return true; 267 268 Error error = m_read_thread.Join(nullptr); 269 return error.Success(); 270 } 271 272 size_t 273 Communication::GetCachedBytes (void *dst, size_t dst_len) 274 { 275 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 276 if (!m_bytes.empty()) 277 { 278 // If DST is nullptr and we have a thread, then return the number 279 // of bytes that are available so the caller can call again 280 if (dst == nullptr) 281 return m_bytes.size(); 282 283 const size_t len = std::min<size_t>(dst_len, m_bytes.size()); 284 285 ::memcpy (dst, m_bytes.c_str(), len); 286 m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len); 287 288 return len; 289 } 290 return 0; 291 } 292 293 void 294 Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, bool broadcast, ConnectionStatus status) 295 { 296 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, 297 "%p Communication::AppendBytesToCache (src = %p, src_len = %" PRIu64 ", broadcast = %i)", 298 this, bytes, (uint64_t)len, broadcast); 299 if ((bytes == nullptr || len == 0) 300 && (status != lldb::eConnectionStatusEndOfFile)) 301 return; 302 if (m_callback) 303 { 304 // If the user registered a callback, then call it and do not broadcast 305 m_callback (m_callback_baton, bytes, len); 306 } 307 else if (bytes != nullptr && len > 0) 308 { 309 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 310 m_bytes.append ((const char *)bytes, len); 311 if (broadcast) 312 BroadcastEventIfUnique (eBroadcastBitReadThreadGotBytes); 313 } 314 } 315 316 size_t 317 Communication::ReadFromConnection (void *dst, 318 size_t dst_len, 319 uint32_t timeout_usec, 320 ConnectionStatus &status, 321 Error *error_ptr) 322 { 323 lldb::ConnectionSP connection_sp(m_connection_sp); 324 return (connection_sp ? connection_sp->Read(dst, dst_len, timeout_usec, status, error_ptr) : 0); 325 } 326 327 bool 328 Communication::ReadThreadIsRunning () 329 { 330 return m_read_thread_enabled; 331 } 332 333 lldb::thread_result_t 334 Communication::ReadThread (lldb::thread_arg_t p) 335 { 336 Communication *comm = (Communication *)p; 337 338 Log *log(lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION)); 339 340 if (log) 341 log->Printf ("%p Communication::ReadThread () thread starting...", p); 342 343 uint8_t buf[1024]; 344 345 Error error; 346 ConnectionStatus status = eConnectionStatusSuccess; 347 bool done = false; 348 while (!done && comm->m_read_thread_enabled) 349 { 350 size_t bytes_read = comm->ReadFromConnection (buf, sizeof(buf), 5 * TimeValue::MicroSecPerSec, status, &error); 351 if (bytes_read > 0) 352 comm->AppendBytesToCache (buf, bytes_read, true, status); 353 else if ((bytes_read == 0) 354 && status == eConnectionStatusEndOfFile) 355 { 356 if (comm->GetCloseOnEOF ()) 357 comm->Disconnect (); 358 comm->AppendBytesToCache (buf, bytes_read, true, status); 359 } 360 361 switch (status) 362 { 363 case eConnectionStatusSuccess: 364 break; 365 366 case eConnectionStatusEndOfFile: 367 done = true; 368 break; 369 case eConnectionStatusError: // Check GetError() for details 370 if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) 371 { 372 // EIO on a pipe is usually caused by remote shutdown 373 comm->Disconnect (); 374 done = true; 375 } 376 if (log) 377 error.LogIfError (log, 378 "%p Communication::ReadFromConnection () => status = %s", 379 p, 380 Communication::ConnectionStatusAsCString (status)); 381 break; 382 case eConnectionStatusInterrupted: // Synchronization signal from SynchronizeWithReadThread() 383 // The connection returns eConnectionStatusInterrupted only when there is no 384 // input pending to be read, so we can signal that. 385 comm->BroadcastEvent (eBroadcastBitNoMorePendingInput); 386 break; 387 case eConnectionStatusNoConnection: // No connection 388 case eConnectionStatusLostConnection: // Lost connection while connected to a valid connection 389 done = true; 390 LLVM_FALLTHROUGH; 391 case eConnectionStatusTimedOut: // Request timed out 392 if (log) 393 error.LogIfError (log, 394 "%p Communication::ReadFromConnection () => status = %s", 395 p, 396 Communication::ConnectionStatusAsCString (status)); 397 break; 398 } 399 } 400 log = lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION); 401 if (log) 402 log->Printf ("%p Communication::ReadThread () thread exiting...", p); 403 404 comm->m_read_thread_did_exit = true; 405 // Let clients know that this thread is exiting 406 comm->BroadcastEvent (eBroadcastBitNoMorePendingInput); 407 comm->BroadcastEvent (eBroadcastBitReadThreadDidExit); 408 return NULL; 409 } 410 411 void 412 Communication::SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback, 413 void *callback_baton) 414 { 415 m_callback = callback; 416 m_callback_baton = callback_baton; 417 } 418 419 void 420 Communication::SynchronizeWithReadThread () 421 { 422 // Only one thread can do the synchronization dance at a time. 423 std::lock_guard<std::mutex> guard(m_synchronize_mutex); 424 425 // First start listening for the synchronization event. 426 ListenerSP listener_sp(Listener::MakeListener("Communication::SyncronizeWithReadThread")); 427 listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput); 428 429 // If the thread is not running, there is no point in synchronizing. 430 if (!m_read_thread_enabled || m_read_thread_did_exit) 431 return; 432 433 // Notify the read thread. 434 m_connection_sp->InterruptRead(); 435 436 // Wait for the synchronization event. 437 EventSP event_sp; 438 listener_sp->WaitForEvent(std::chrono::microseconds(0), event_sp); 439 } 440 441 void 442 Communication::SetConnection (Connection *connection) 443 { 444 Disconnect(nullptr); 445 StopReadThread(nullptr); 446 m_connection_sp.reset(connection); 447 } 448 449 const char * 450 Communication::ConnectionStatusAsCString (lldb::ConnectionStatus status) 451 { 452 switch (status) 453 { 454 case eConnectionStatusSuccess: return "success"; 455 case eConnectionStatusError: return "error"; 456 case eConnectionStatusTimedOut: return "timed out"; 457 case eConnectionStatusNoConnection: return "no connection"; 458 case eConnectionStatusLostConnection: return "lost connection"; 459 case eConnectionStatusEndOfFile: return "end of file"; 460 case eConnectionStatusInterrupted: return "interrupted"; 461 } 462 463 static char unknown_state_string[64]; 464 snprintf(unknown_state_string, sizeof (unknown_state_string), "ConnectionStatus = %i", status); 465 return unknown_state_string; 466 } 467