summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h73
1 files changed, 51 insertions, 22 deletions
diff --git a/llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h b/llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h
index 588deb020ac..37e2e66e5af 100644
--- a/llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h
+++ b/llvm/include/llvm/ExecutionEngine/Orc/RPCUtils.h
@@ -788,15 +788,21 @@ public:
return FnIdOrErr.takeError();
}
- // Allocate a sequence number.
- auto SeqNo = SequenceNumberMgr.getSequenceNumber();
- assert(!PendingResponses.count(SeqNo) &&
- "Sequence number already allocated");
+ SequenceNumberT SeqNo; // initialized in locked scope below.
+ {
+ // Lock the pending responses map and sequence number manager.
+ std::lock_guard<std::mutex> Lock(ResponsesMutex);
+
+ // Allocate a sequence number.
+ SeqNo = SequenceNumberMgr.getSequenceNumber();
+ assert(!PendingResponses.count(SeqNo) &&
+ "Sequence number already allocated");
- // Install the user handler.
- PendingResponses[SeqNo] =
+ // Install the user handler.
+ PendingResponses[SeqNo] =
detail::createResponseHandler<ChannelT, typename Func::ReturnType>(
std::move(Handler));
+ }
// Open the function call message.
if (auto Err = C.startSendMessage(FnId, SeqNo)) {
@@ -863,6 +869,24 @@ public:
return detail::ReadArgs<ArgTs...>(Args...);
}
+ /// Abandon all outstanding result handlers.
+ ///
+ /// This will call all currently registered result handlers to receive an
+ /// "abandoned" error as their argument. This is used internally by the RPC
+ /// in error situations, but can also be called directly by clients who are
+ /// disconnecting from the remote and don't or can't expect responses to their
+ /// outstanding calls. (Especially for outstanding blocking calls, calling
+ /// this function may be necessary to avoid dead threads).
+ void abandonPendingResponses() {
+ // Lock the pending responses map and sequence number manager.
+ std::lock_guard<std::mutex> Lock(ResponsesMutex);
+
+ for (auto &KV : PendingResponses)
+ KV.second->abandon();
+ PendingResponses.clear();
+ SequenceNumberMgr.reset();
+ }
+
protected:
// The LaunchPolicy type allows a launch policy to be specified when adding
// a function handler. See addHandlerImpl.
@@ -888,28 +912,32 @@ protected:
wrapHandler<Func>(std::move(Handler), std::move(Launch));
}
- // Abandon all outstanding results.
- void abandonPendingResponses() {
- for (auto &KV : PendingResponses)
- KV.second->abandon();
- PendingResponses.clear();
- SequenceNumberMgr.reset();
- }
-
Error handleResponse(SequenceNumberT SeqNo) {
- auto I = PendingResponses.find(SeqNo);
- if (I == PendingResponses.end()) {
- abandonPendingResponses();
- return orcError(OrcErrorCode::UnexpectedRPCResponse);
+ using Handler = typename decltype(PendingResponses)::mapped_type;
+ Handler PRHandler;
+
+ {
+ // Lock the pending responses map and sequence number manager.
+ std::unique_lock<std::mutex> Lock(ResponsesMutex);
+ auto I = PendingResponses.find(SeqNo);
+
+ if (I != PendingResponses.end()) {
+ PRHandler = std::move(I->second);
+ PendingResponses.erase(I);
+ SequenceNumberMgr.releaseSequenceNumber(SeqNo);
+ } else {
+ // Unlock the pending results map to prevent recursive lock.
+ Lock.unlock();
+ abandonPendingResponses();
+ return orcError(OrcErrorCode::UnexpectedRPCResponse);
+ }
}
- auto PRHandler = std::move(I->second);
- PendingResponses.erase(I);
- SequenceNumberMgr.releaseSequenceNumber(SeqNo);
+ assert(PRHandler &&
+ "If we didn't find a response handler we should have bailed out");
if (auto Err = PRHandler->handleResponse(C)) {
abandonPendingResponses();
- SequenceNumberMgr.reset();
return Err;
}
@@ -1016,6 +1044,7 @@ protected:
std::map<FunctionIdT, WrappedHandlerFn> Handlers;
+ std::mutex ResponsesMutex;
detail::SequenceNumberManager<SequenceNumberT> SequenceNumberMgr;
std::map<SequenceNumberT, std::unique_ptr<detail::ResponseHandler<ChannelT>>>
PendingResponses;
OpenPOWER on IntegriCloud