1 //===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h" 10 11 #include "llvm/ExecutionEngine/Orc/Shared/TargetProcessControlTypes.h" 12 #include "llvm/Support/FormatVariadic.h" 13 #include "llvm/Support/Host.h" 14 #include "llvm/Support/Process.h" 15 16 #include "OrcRTBootstrap.h" 17 18 #define DEBUG_TYPE "orc" 19 20 using namespace llvm::orc::shared; 21 22 namespace llvm { 23 namespace orc { 24 25 ExecutorBootstrapService::~ExecutorBootstrapService() {} 26 27 SimpleRemoteEPCServer::Dispatcher::~Dispatcher() {} 28 29 #if LLVM_ENABLE_THREADS 30 void SimpleRemoteEPCServer::ThreadDispatcher::dispatch( 31 unique_function<void()> Work) { 32 { 33 std::lock_guard<std::mutex> Lock(DispatchMutex); 34 if (!Running) 35 return; 36 ++Outstanding; 37 } 38 39 std::thread([this, Work = std::move(Work)]() mutable { 40 Work(); 41 std::lock_guard<std::mutex> Lock(DispatchMutex); 42 --Outstanding; 43 OutstandingCV.notify_all(); 44 }).detach(); 45 } 46 47 void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() { 48 std::unique_lock<std::mutex> Lock(DispatchMutex); 49 Running = false; 50 OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; }); 51 } 52 #endif 53 54 StringMap<ExecutorAddr> SimpleRemoteEPCServer::defaultBootstrapSymbols() { 55 StringMap<ExecutorAddr> DBS; 56 rt_bootstrap::addTo(DBS); 57 return DBS; 58 } 59 60 Expected<SimpleRemoteEPCTransportClient::HandleMessageAction> 61 SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo, 62 ExecutorAddr TagAddr, 63 SimpleRemoteEPCArgBytesVector ArgBytes) { 64 65 LLVM_DEBUG({ 66 dbgs() << "SimpleRemoteEPCServer::handleMessage: opc = "; 67 switch (OpC) { 68 case SimpleRemoteEPCOpcode::Setup: 69 dbgs() << "Setup"; 70 assert(SeqNo == 0 && "Non-zero SeqNo for Setup?"); 71 assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Setup?"); 72 break; 73 case SimpleRemoteEPCOpcode::Hangup: 74 dbgs() << "Hangup"; 75 assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?"); 76 assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Hangup?"); 77 break; 78 case SimpleRemoteEPCOpcode::Result: 79 dbgs() << "Result"; 80 assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Result?"); 81 break; 82 case SimpleRemoteEPCOpcode::CallWrapper: 83 dbgs() << "CallWrapper"; 84 break; 85 } 86 dbgs() << ", seqno = " << SeqNo 87 << ", tag-addr = " << formatv("{0:x}", TagAddr.getValue()) 88 << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size()) 89 << " bytes\n"; 90 }); 91 92 using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>; 93 if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC)) 94 return make_error<StringError>("Unexpected opcode", 95 inconvertibleErrorCode()); 96 97 // TODO: Clean detach message? 98 switch (OpC) { 99 case SimpleRemoteEPCOpcode::Setup: 100 return make_error<StringError>("Unexpected Setup opcode", 101 inconvertibleErrorCode()); 102 case SimpleRemoteEPCOpcode::Hangup: 103 return SimpleRemoteEPCTransportClient::EndSession; 104 case SimpleRemoteEPCOpcode::Result: 105 if (auto Err = handleResult(SeqNo, TagAddr, std::move(ArgBytes))) 106 return std::move(Err); 107 break; 108 case SimpleRemoteEPCOpcode::CallWrapper: 109 handleCallWrapper(SeqNo, TagAddr, std::move(ArgBytes)); 110 break; 111 } 112 return ContinueSession; 113 } 114 115 Error SimpleRemoteEPCServer::waitForDisconnect() { 116 std::unique_lock<std::mutex> Lock(ServerStateMutex); 117 ShutdownCV.wait(Lock, [this]() { return RunState == ServerShutDown; }); 118 return std::move(ShutdownErr); 119 } 120 121 void SimpleRemoteEPCServer::handleDisconnect(Error Err) { 122 PendingJITDispatchResultsMap TmpPending; 123 124 { 125 std::lock_guard<std::mutex> Lock(ServerStateMutex); 126 std::swap(TmpPending, PendingJITDispatchResults); 127 RunState = ServerShuttingDown; 128 } 129 130 // Send out-of-band errors to any waiting threads. 131 for (auto &KV : TmpPending) 132 KV.second->set_value( 133 shared::WrapperFunctionResult::createOutOfBandError("disconnecting")); 134 135 // TODO: Free attached resources. 136 // 1. Close libraries in DylibHandles. 137 138 // Wait for dispatcher to clear. 139 D->shutdown(); 140 141 // Shut down services. 142 while (!Services.empty()) { 143 ShutdownErr = 144 joinErrors(std::move(ShutdownErr), Services.back()->shutdown()); 145 Services.pop_back(); 146 } 147 148 std::lock_guard<std::mutex> Lock(ServerStateMutex); 149 ShutdownErr = joinErrors(std::move(ShutdownErr), std::move(Err)); 150 RunState = ServerShutDown; 151 ShutdownCV.notify_all(); 152 } 153 154 Error SimpleRemoteEPCServer::sendMessage(SimpleRemoteEPCOpcode OpC, 155 uint64_t SeqNo, ExecutorAddr TagAddr, 156 ArrayRef<char> ArgBytes) { 157 158 LLVM_DEBUG({ 159 dbgs() << "SimpleRemoteEPCServer::sendMessage: opc = "; 160 switch (OpC) { 161 case SimpleRemoteEPCOpcode::Setup: 162 dbgs() << "Setup"; 163 assert(SeqNo == 0 && "Non-zero SeqNo for Setup?"); 164 assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Setup?"); 165 break; 166 case SimpleRemoteEPCOpcode::Hangup: 167 dbgs() << "Hangup"; 168 assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?"); 169 assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Hangup?"); 170 break; 171 case SimpleRemoteEPCOpcode::Result: 172 dbgs() << "Result"; 173 assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Result?"); 174 break; 175 case SimpleRemoteEPCOpcode::CallWrapper: 176 dbgs() << "CallWrapper"; 177 break; 178 } 179 dbgs() << ", seqno = " << SeqNo 180 << ", tag-addr = " << formatv("{0:x}", TagAddr.getValue()) 181 << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size()) 182 << " bytes\n"; 183 }); 184 auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes); 185 LLVM_DEBUG({ 186 if (Err) 187 dbgs() << " \\--> SimpleRemoteEPC::sendMessage failed\n"; 188 }); 189 return Err; 190 } 191 192 Error SimpleRemoteEPCServer::sendSetupMessage( 193 StringMap<ExecutorAddr> BootstrapSymbols) { 194 195 using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames; 196 197 std::vector<char> SetupPacket; 198 SimpleRemoteEPCExecutorInfo EI; 199 EI.TargetTriple = sys::getProcessTriple(); 200 if (auto PageSize = sys::Process::getPageSize()) 201 EI.PageSize = *PageSize; 202 else 203 return PageSize.takeError(); 204 EI.BootstrapSymbols = std::move(BootstrapSymbols); 205 206 assert(!EI.BootstrapSymbols.count(ExecutorSessionObjectName) && 207 "Dispatch context name should not be set"); 208 assert(!EI.BootstrapSymbols.count(DispatchFnName) && 209 "Dispatch function name should not be set"); 210 EI.BootstrapSymbols[ExecutorSessionObjectName] = ExecutorAddr::fromPtr(this); 211 EI.BootstrapSymbols[DispatchFnName] = ExecutorAddr::fromPtr(jitDispatchEntry); 212 213 using SPSSerialize = 214 shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>; 215 auto SetupPacketBytes = 216 shared::WrapperFunctionResult::allocate(SPSSerialize::size(EI)); 217 shared::SPSOutputBuffer OB(SetupPacketBytes.data(), SetupPacketBytes.size()); 218 if (!SPSSerialize::serialize(OB, EI)) 219 return make_error<StringError>("Could not send setup packet", 220 inconvertibleErrorCode()); 221 222 return sendMessage(SimpleRemoteEPCOpcode::Setup, 0, ExecutorAddr(), 223 {SetupPacketBytes.data(), SetupPacketBytes.size()}); 224 } 225 226 Error SimpleRemoteEPCServer::handleResult( 227 uint64_t SeqNo, ExecutorAddr TagAddr, 228 SimpleRemoteEPCArgBytesVector ArgBytes) { 229 std::promise<shared::WrapperFunctionResult> *P = nullptr; 230 { 231 std::lock_guard<std::mutex> Lock(ServerStateMutex); 232 auto I = PendingJITDispatchResults.find(SeqNo); 233 if (I == PendingJITDispatchResults.end()) 234 return make_error<StringError>("No call for sequence number " + 235 Twine(SeqNo), 236 inconvertibleErrorCode()); 237 P = I->second; 238 PendingJITDispatchResults.erase(I); 239 releaseSeqNo(SeqNo); 240 } 241 auto R = shared::WrapperFunctionResult::allocate(ArgBytes.size()); 242 memcpy(R.data(), ArgBytes.data(), ArgBytes.size()); 243 P->set_value(std::move(R)); 244 return Error::success(); 245 } 246 247 void SimpleRemoteEPCServer::handleCallWrapper( 248 uint64_t RemoteSeqNo, ExecutorAddr TagAddr, 249 SimpleRemoteEPCArgBytesVector ArgBytes) { 250 D->dispatch([this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() { 251 using WrapperFnTy = 252 shared::detail::CWrapperFunctionResult (*)(const char *, size_t); 253 auto *Fn = TagAddr.toPtr<WrapperFnTy>(); 254 shared::WrapperFunctionResult ResultBytes( 255 Fn(ArgBytes.data(), ArgBytes.size())); 256 if (auto Err = sendMessage(SimpleRemoteEPCOpcode::Result, RemoteSeqNo, 257 ExecutorAddr(), 258 {ResultBytes.data(), ResultBytes.size()})) 259 ReportError(std::move(Err)); 260 }); 261 } 262 263 shared::WrapperFunctionResult 264 SimpleRemoteEPCServer::doJITDispatch(const void *FnTag, const char *ArgData, 265 size_t ArgSize) { 266 uint64_t SeqNo; 267 std::promise<shared::WrapperFunctionResult> ResultP; 268 auto ResultF = ResultP.get_future(); 269 { 270 std::lock_guard<std::mutex> Lock(ServerStateMutex); 271 if (RunState != ServerRunning) 272 return shared::WrapperFunctionResult::createOutOfBandError( 273 "jit_dispatch not available (EPC server shut down)"); 274 275 SeqNo = getNextSeqNo(); 276 assert(!PendingJITDispatchResults.count(SeqNo) && "SeqNo already in use"); 277 PendingJITDispatchResults[SeqNo] = &ResultP; 278 } 279 280 if (auto Err = sendMessage(SimpleRemoteEPCOpcode::CallWrapper, SeqNo, 281 ExecutorAddr::fromPtr(FnTag), {ArgData, ArgSize})) 282 ReportError(std::move(Err)); 283 284 return ResultF.get(); 285 } 286 287 shared::detail::CWrapperFunctionResult 288 SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx, const void *FnTag, 289 const char *ArgData, size_t ArgSize) { 290 return reinterpret_cast<SimpleRemoteEPCServer *>(DispatchCtx) 291 ->doJITDispatch(FnTag, ArgData, ArgSize) 292 .release(); 293 } 294 295 } // end namespace orc 296 } // end namespace llvm 297