Passes the ConnectionInfo of the Rpc client on server callback (#112)

This commit is contained in:
Thad House
2016-09-16 20:20:55 -07:00
committed by Peter Johnson
parent c5c069743b
commit 7e9754acff
8 changed files with 40 additions and 12 deletions

View File

@@ -47,15 +47,17 @@ void RpcServer::Stop() { m_owner.Stop(); }
void RpcServer::ProcessRpc(StringRef name, std::shared_ptr<Message> msg,
RpcCallback func, unsigned int conn_id,
const ConnectionInfo& conn_info,
SendMsgFunc send_response) {
if (func) {
auto thr = m_owner.GetThread();
if (!thr) return;
thr->m_call_queue.emplace(name, msg, func, conn_id, send_response);
thr->m_call_queue.emplace(name, msg, func, conn_id, conn_info,
send_response);
thr->m_cond.notify_one();
} else {
std::lock_guard<std::mutex> lock(m_mutex);
m_poll_queue.emplace(name, msg, func, conn_id, send_response);
m_poll_queue.emplace(name, msg, func, conn_id, conn_info, send_response);
m_poll_cond.notify_one();
}
}
@@ -96,6 +98,7 @@ bool RpcServer::PollRpc(bool blocking, double time_out, RpcCallInfo* call_info)
call_uid = item.msg->seq_num_uid();
call_info->rpc_id = item.msg->id();
call_info->call_uid = call_uid;
call_info->conn_info = item.conn_info;
call_info->name = std::move(item.name);
call_info->params = item.msg->str();
m_response_map.insert(std::make_pair(std::make_pair(item.msg->id(), call_uid),
@@ -138,7 +141,7 @@ void RpcServer::Thread::Main() {
// Don't hold mutex during callback execution!
lock.unlock();
auto result = item.func(item.name, item.msg->str());
auto result = item.func(item.name, item.msg->str(), item.conn_info);
item.send_response(Message::RpcResponse(item.msg->id(),
item.msg->seq_num_uid(), result));
lock.lock();