diff options
Diffstat (limited to 'llvm')
-rw-r--r-- | llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h | 73 |
1 files changed, 51 insertions, 22 deletions
diff --git a/llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h b/llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h index 588deb020ac..37e2e66e5af 100644 --- a/llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h +++ b/llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h @@ -788,15 +788,21 @@ public: return FnIdOrErr.takeError(); } - // Allocate a sequence number. - auto SeqNo = SequenceNumberMgr.getSequenceNumber(); - assert(!PendingResponses.count(SeqNo) && - "Sequence number already allocated"); + SequenceNumberT SeqNo; // initialized in locked scope below. + { + // Lock the pending responses map and sequence number manager. + std::lock_guard<std::mutex> Lock(ResponsesMutex); + + // Allocate a sequence number. + SeqNo = SequenceNumberMgr.getSequenceNumber(); + assert(!PendingResponses.count(SeqNo) && + "Sequence number already allocated"); - // Install the user handler. - PendingResponses[SeqNo] = + // Install the user handler. + PendingResponses[SeqNo] = detail::createResponseHandler<ChannelT, typename Func::ReturnType>( std::move(Handler)); + } // Open the function call message. if (auto Err = C.startSendMessage(FnId, SeqNo)) { @@ -863,6 +869,24 @@ public: return detail::ReadArgs<ArgTs...>(Args...); } + /// Abandon all outstanding result handlers. + /// + /// This will call all currently registered result handlers to receive an + /// "abandoned" error as their argument. This is used internally by the RPC + /// in error situations, but can also be called directly by clients who are + /// disconnecting from the remote and don't or can't expect responses to their + /// outstanding calls. (Especially for outstanding blocking calls, calling + /// this function may be necessary to avoid dead threads). + void abandonPendingResponses() { + // Lock the pending responses map and sequence number manager. + std::lock_guard<std::mutex> Lock(ResponsesMutex); + + for (auto &KV : PendingResponses) + KV.second->abandon(); + PendingResponses.clear(); + SequenceNumberMgr.reset(); + } + protected: // The LaunchPolicy type allows a launch policy to be specified when adding // a function handler. See addHandlerImpl. @@ -888,28 +912,32 @@ protected: wrapHandler<Func>(std::move(Handler), std::move(Launch)); } - // Abandon all outstanding results. - void abandonPendingResponses() { - for (auto &KV : PendingResponses) - KV.second->abandon(); - PendingResponses.clear(); - SequenceNumberMgr.reset(); - } - Error handleResponse(SequenceNumberT SeqNo) { - auto I = PendingResponses.find(SeqNo); - if (I == PendingResponses.end()) { - abandonPendingResponses(); - return orcError(OrcErrorCode::UnexpectedRPCResponse); + using Handler = typename decltype(PendingResponses)::mapped_type; + Handler PRHandler; + + { + // Lock the pending responses map and sequence number manager. + std::unique_lock<std::mutex> Lock(ResponsesMutex); + auto I = PendingResponses.find(SeqNo); + + if (I != PendingResponses.end()) { + PRHandler = std::move(I->second); + PendingResponses.erase(I); + SequenceNumberMgr.releaseSequenceNumber(SeqNo); + } else { + // Unlock the pending results map to prevent recursive lock. + Lock.unlock(); + abandonPendingResponses(); + return orcError(OrcErrorCode::UnexpectedRPCResponse); + } } - auto PRHandler = std::move(I->second); - PendingResponses.erase(I); - SequenceNumberMgr.releaseSequenceNumber(SeqNo); + assert(PRHandler && + "If we didn't find a response handler we should have bailed out"); if (auto Err = PRHandler->handleResponse(C)) { abandonPendingResponses(); - SequenceNumberMgr.reset(); return Err; } @@ -1016,6 +1044,7 @@ protected: std::map<FunctionIdT, WrappedHandlerFn> Handlers; + std::mutex ResponsesMutex; detail::SequenceNumberManager<SequenceNumberT> SequenceNumberMgr; std::map<SequenceNumberT, std::unique_ptr<detail::ResponseHandler<ChannelT>>> PendingResponses; |