1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===// 2 // 3 // The LLVM Compiler Infrastructure 4 // 5 // This file is distributed under the University of Illinois Open Source 6 // License. See LICENSE.TXT for details. 7 // 8 //===----------------------------------------------------------------------===// 9 10 // C Includes 11 // C++ Includes 12 #include <cstring> 13 14 // Other libraries and framework includes 15 // Project includes 16 #include "lldb/Core/Communication.h" 17 #include "lldb/Core/Connection.h" 18 #include "lldb/Core/Event.h" 19 #include "lldb/Core/Listener.h" 20 #include "lldb/Core/Log.h" 21 #include "lldb/Core/Timer.h" 22 #include "lldb/Host/Host.h" 23 #include "lldb/Host/HostThread.h" 24 #include "lldb/Host/ThreadLauncher.h" 25 26 using namespace lldb; 27 using namespace lldb_private; 28 29 ConstString &Communication::GetStaticBroadcasterClass() { 30 static ConstString class_name("lldb.communication"); 31 return class_name; 32 } 33 34 Communication::Communication(const char *name) 35 : Broadcaster(nullptr, name), m_connection_sp(), 36 m_read_thread_enabled(false), m_read_thread_did_exit(false), m_bytes(), 37 m_bytes_mutex(), m_write_mutex(), m_synchronize_mutex(), 38 m_callback(nullptr), m_callback_baton(nullptr), m_close_on_eof(true) 39 40 { 41 lldb_private::LogIfAnyCategoriesSet( 42 LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION, 43 "%p Communication::Communication (name = %s)", this, name); 44 45 SetEventName(eBroadcastBitDisconnected, "disconnected"); 46 SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes"); 47 SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit"); 48 SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit"); 49 SetEventName(eBroadcastBitPacketAvailable, "packet available"); 50 SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input"); 51 52 CheckInWithManager(); 53 } 54 55 Communication::~Communication() { 56 lldb_private::LogIfAnyCategoriesSet( 57 LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION, 58 "%p Communication::~Communication (name = %s)", this, 59 GetBroadcasterName().AsCString()); 60 Clear(); 61 } 62 63 void Communication::Clear() { 64 SetReadThreadBytesReceivedCallback(nullptr, nullptr); 65 Disconnect(nullptr); 66 StopReadThread(nullptr); 67 } 68 69 ConnectionStatus Communication::Connect(const char *url, Error *error_ptr) { 70 Clear(); 71 72 lldb_private::LogIfAnyCategoriesSet(LIBLLDB_LOG_COMMUNICATION, 73 "%p Communication::Connect (url = %s)", 74 this, url); 75 76 lldb::ConnectionSP connection_sp(m_connection_sp); 77 if (connection_sp) 78 return connection_sp->Connect(url, error_ptr); 79 if (error_ptr) 80 error_ptr->SetErrorString("Invalid connection."); 81 return eConnectionStatusNoConnection; 82 } 83 84 ConnectionStatus Communication::Disconnect(Error *error_ptr) { 85 lldb_private::LogIfAnyCategoriesSet(LIBLLDB_LOG_COMMUNICATION, 86 "%p Communication::Disconnect ()", this); 87 88 lldb::ConnectionSP connection_sp(m_connection_sp); 89 if (connection_sp) { 90 ConnectionStatus status = connection_sp->Disconnect(error_ptr); 91 // We currently don't protect connection_sp with any mutex for 92 // multi-threaded environments. So lets not nuke our connection class 93 // without putting some multi-threaded protections in. We also probably 94 // don't want to pay for the overhead it might cause if every time we 95 // access the connection we have to take a lock. 96 // 97 // This unique pointer will cleanup after itself when this object goes away, 98 // so there is no need to currently have it destroy itself immediately 99 // upon disconnnect. 100 // connection_sp.reset(); 101 return status; 102 } 103 return eConnectionStatusNoConnection; 104 } 105 106 bool Communication::IsConnected() const { 107 lldb::ConnectionSP connection_sp(m_connection_sp); 108 return (connection_sp ? connection_sp->IsConnected() : false); 109 } 110 111 bool Communication::HasConnection() const { 112 return m_connection_sp.get() != nullptr; 113 } 114 115 size_t Communication::Read(void *dst, size_t dst_len, 116 const Timeout<std::micro> &timeout, 117 ConnectionStatus &status, Error *error_ptr) { 118 lldb_private::LogIfAnyCategoriesSet( 119 LIBLLDB_LOG_COMMUNICATION, 120 "%p Communication::Read (dst = %p, dst_len = %" PRIu64 121 ", timeout = %u usec) connection = %p", 122 this, dst, (uint64_t)dst_len, timeout ? timeout->count() : -1, 123 m_connection_sp.get()); 124 125 if (m_read_thread_enabled) { 126 // We have a dedicated read thread that is getting data for us 127 size_t cached_bytes = GetCachedBytes(dst, dst_len); 128 if (cached_bytes > 0 || (timeout && timeout->count() == 0)) { 129 status = eConnectionStatusSuccess; 130 return cached_bytes; 131 } 132 133 if (!m_connection_sp) { 134 if (error_ptr) 135 error_ptr->SetErrorString("Invalid connection."); 136 status = eConnectionStatusNoConnection; 137 return 0; 138 } 139 140 ListenerSP listener_sp(Listener::MakeListener("Communication::Read")); 141 listener_sp->StartListeningForEvents( 142 this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); 143 EventSP event_sp; 144 while (listener_sp->GetEvent(event_sp, timeout)) { 145 const uint32_t event_type = event_sp->GetType(); 146 if (event_type & eBroadcastBitReadThreadGotBytes) { 147 return GetCachedBytes(dst, dst_len); 148 } 149 150 if (event_type & eBroadcastBitReadThreadDidExit) { 151 if (GetCloseOnEOF()) 152 Disconnect(nullptr); 153 break; 154 } 155 } 156 return 0; 157 } 158 159 // We aren't using a read thread, just read the data synchronously in this 160 // thread. 161 return ReadFromConnection(dst, dst_len, timeout, status, error_ptr); 162 } 163 164 size_t Communication::Write(const void *src, size_t src_len, 165 ConnectionStatus &status, Error *error_ptr) { 166 lldb::ConnectionSP connection_sp(m_connection_sp); 167 168 std::lock_guard<std::mutex> guard(m_write_mutex); 169 lldb_private::LogIfAnyCategoriesSet( 170 LIBLLDB_LOG_COMMUNICATION, 171 "%p Communication::Write (src = %p, src_len = %" PRIu64 172 ") connection = %p", 173 this, src, (uint64_t)src_len, connection_sp.get()); 174 175 if (connection_sp) 176 return connection_sp->Write(src, src_len, status, error_ptr); 177 178 if (error_ptr) 179 error_ptr->SetErrorString("Invalid connection."); 180 status = eConnectionStatusNoConnection; 181 return 0; 182 } 183 184 bool Communication::StartReadThread(Error *error_ptr) { 185 if (error_ptr) 186 error_ptr->Clear(); 187 188 if (m_read_thread.IsJoinable()) 189 return true; 190 191 lldb_private::LogIfAnyCategoriesSet( 192 LIBLLDB_LOG_COMMUNICATION, "%p Communication::StartReadThread ()", this); 193 194 char thread_name[1024]; 195 snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", 196 GetBroadcasterName().AsCString()); 197 198 m_read_thread_enabled = true; 199 m_read_thread_did_exit = false; 200 m_read_thread = ThreadLauncher::LaunchThread( 201 thread_name, Communication::ReadThread, this, error_ptr); 202 if (!m_read_thread.IsJoinable()) 203 m_read_thread_enabled = false; 204 return m_read_thread_enabled; 205 } 206 207 bool Communication::StopReadThread(Error *error_ptr) { 208 if (!m_read_thread.IsJoinable()) 209 return true; 210 211 lldb_private::LogIfAnyCategoriesSet( 212 LIBLLDB_LOG_COMMUNICATION, "%p Communication::StopReadThread ()", this); 213 214 m_read_thread_enabled = false; 215 216 BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); 217 218 // error = m_read_thread.Cancel(); 219 220 Error error = m_read_thread.Join(nullptr); 221 return error.Success(); 222 } 223 224 bool Communication::JoinReadThread(Error *error_ptr) { 225 if (!m_read_thread.IsJoinable()) 226 return true; 227 228 Error error = m_read_thread.Join(nullptr); 229 return error.Success(); 230 } 231 232 size_t Communication::GetCachedBytes(void *dst, size_t dst_len) { 233 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 234 if (!m_bytes.empty()) { 235 // If DST is nullptr and we have a thread, then return the number 236 // of bytes that are available so the caller can call again 237 if (dst == nullptr) 238 return m_bytes.size(); 239 240 const size_t len = std::min<size_t>(dst_len, m_bytes.size()); 241 242 ::memcpy(dst, m_bytes.c_str(), len); 243 m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len); 244 245 return len; 246 } 247 return 0; 248 } 249 250 void Communication::AppendBytesToCache(const uint8_t *bytes, size_t len, 251 bool broadcast, 252 ConnectionStatus status) { 253 lldb_private::LogIfAnyCategoriesSet( 254 LIBLLDB_LOG_COMMUNICATION, 255 "%p Communication::AppendBytesToCache (src = %p, src_len = %" PRIu64 256 ", broadcast = %i)", 257 this, bytes, (uint64_t)len, broadcast); 258 if ((bytes == nullptr || len == 0) && 259 (status != lldb::eConnectionStatusEndOfFile)) 260 return; 261 if (m_callback) { 262 // If the user registered a callback, then call it and do not broadcast 263 m_callback(m_callback_baton, bytes, len); 264 } else if (bytes != nullptr && len > 0) { 265 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 266 m_bytes.append((const char *)bytes, len); 267 if (broadcast) 268 BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes); 269 } 270 } 271 272 size_t Communication::ReadFromConnection(void *dst, size_t dst_len, 273 const Timeout<std::micro> &timeout, 274 ConnectionStatus &status, 275 Error *error_ptr) { 276 lldb::ConnectionSP connection_sp(m_connection_sp); 277 if (connection_sp) 278 return connection_sp->Read(dst, dst_len, timeout, status, error_ptr); 279 280 if (error_ptr) 281 error_ptr->SetErrorString("Invalid connection."); 282 status = eConnectionStatusNoConnection; 283 return 0; 284 } 285 286 bool Communication::ReadThreadIsRunning() { return m_read_thread_enabled; } 287 288 lldb::thread_result_t Communication::ReadThread(lldb::thread_arg_t p) { 289 Communication *comm = (Communication *)p; 290 291 Log *log(lldb_private::GetLogIfAllCategoriesSet(LIBLLDB_LOG_COMMUNICATION)); 292 293 if (log) 294 log->Printf("%p Communication::ReadThread () thread starting...", p); 295 296 uint8_t buf[1024]; 297 298 Error error; 299 ConnectionStatus status = eConnectionStatusSuccess; 300 bool done = false; 301 while (!done && comm->m_read_thread_enabled) { 302 size_t bytes_read = comm->ReadFromConnection( 303 buf, sizeof(buf), std::chrono::seconds(5), status, &error); 304 if (bytes_read > 0) 305 comm->AppendBytesToCache(buf, bytes_read, true, status); 306 else if ((bytes_read == 0) && status == eConnectionStatusEndOfFile) { 307 if (comm->GetCloseOnEOF()) 308 comm->Disconnect(); 309 comm->AppendBytesToCache(buf, bytes_read, true, status); 310 } 311 312 switch (status) { 313 case eConnectionStatusSuccess: 314 break; 315 316 case eConnectionStatusEndOfFile: 317 done = true; 318 break; 319 case eConnectionStatusError: // Check GetError() for details 320 if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { 321 // EIO on a pipe is usually caused by remote shutdown 322 comm->Disconnect(); 323 done = true; 324 } 325 if (log) 326 error.LogIfError( 327 log, "%p Communication::ReadFromConnection () => status = %s", p, 328 Communication::ConnectionStatusAsCString(status)); 329 break; 330 case eConnectionStatusInterrupted: // Synchronization signal from 331 // SynchronizeWithReadThread() 332 // The connection returns eConnectionStatusInterrupted only when there is 333 // no 334 // input pending to be read, so we can signal that. 335 comm->BroadcastEvent(eBroadcastBitNoMorePendingInput); 336 break; 337 case eConnectionStatusNoConnection: // No connection 338 case eConnectionStatusLostConnection: // Lost connection while connected to 339 // a valid connection 340 done = true; 341 LLVM_FALLTHROUGH; 342 case eConnectionStatusTimedOut: // Request timed out 343 if (log) 344 error.LogIfError( 345 log, "%p Communication::ReadFromConnection () => status = %s", p, 346 Communication::ConnectionStatusAsCString(status)); 347 break; 348 } 349 } 350 log = lldb_private::GetLogIfAllCategoriesSet(LIBLLDB_LOG_COMMUNICATION); 351 if (log) 352 log->Printf("%p Communication::ReadThread () thread exiting...", p); 353 354 comm->m_read_thread_did_exit = true; 355 // Let clients know that this thread is exiting 356 comm->BroadcastEvent(eBroadcastBitNoMorePendingInput); 357 comm->BroadcastEvent(eBroadcastBitReadThreadDidExit); 358 return NULL; 359 } 360 361 void Communication::SetReadThreadBytesReceivedCallback( 362 ReadThreadBytesReceived callback, void *callback_baton) { 363 m_callback = callback; 364 m_callback_baton = callback_baton; 365 } 366 367 void Communication::SynchronizeWithReadThread() { 368 // Only one thread can do the synchronization dance at a time. 369 std::lock_guard<std::mutex> guard(m_synchronize_mutex); 370 371 // First start listening for the synchronization event. 372 ListenerSP listener_sp( 373 Listener::MakeListener("Communication::SyncronizeWithReadThread")); 374 listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput); 375 376 // If the thread is not running, there is no point in synchronizing. 377 if (!m_read_thread_enabled || m_read_thread_did_exit) 378 return; 379 380 // Notify the read thread. 381 m_connection_sp->InterruptRead(); 382 383 // Wait for the synchronization event. 384 EventSP event_sp; 385 listener_sp->GetEvent(event_sp, llvm::None); 386 } 387 388 void Communication::SetConnection(Connection *connection) { 389 Disconnect(nullptr); 390 StopReadThread(nullptr); 391 m_connection_sp.reset(connection); 392 } 393 394 const char * 395 Communication::ConnectionStatusAsCString(lldb::ConnectionStatus status) { 396 switch (status) { 397 case eConnectionStatusSuccess: 398 return "success"; 399 case eConnectionStatusError: 400 return "error"; 401 case eConnectionStatusTimedOut: 402 return "timed out"; 403 case eConnectionStatusNoConnection: 404 return "no connection"; 405 case eConnectionStatusLostConnection: 406 return "lost connection"; 407 case eConnectionStatusEndOfFile: 408 return "end of file"; 409 case eConnectionStatusInterrupted: 410 return "interrupted"; 411 } 412 413 static char unknown_state_string[64]; 414 snprintf(unknown_state_string, sizeof(unknown_state_string), 415 "ConnectionStatus = %i", status); 416 return unknown_state_string; 417 } 418