/*----------------------------------------------------------------------------*/ /* Copyright (c) FIRST 2015. All Rights Reserved. */ /* Open Source Software - may be modified and shared by FRC teams. The code */ /* must be accompanied by the FIRST BSD license file in the root directory of */ /* the project. */ /*----------------------------------------------------------------------------*/ #include "RpcServer.h" #include "Log.h" using namespace nt; ATOMIC_STATIC_INIT(RpcServer) RpcServer::RpcServer() { m_active = false; m_terminating = false; } RpcServer::~RpcServer() { Logger::GetInstance().SetLogger(nullptr); Stop(); m_terminating = true; m_poll_cond.notify_all(); } void RpcServer::Start() { { std::lock_guard lock(m_mutex); if (m_active) return; m_active = true; } { std::lock_guard lock(m_shutdown_mutex); m_shutdown = false; } m_thread = std::thread(&RpcServer::ThreadMain, this); } void RpcServer::Stop() { m_active = false; if (m_thread.joinable()) { // send notification so the thread terminates m_call_cond.notify_one(); // join with timeout std::unique_lock lock(m_shutdown_mutex); auto timeout_time = std::chrono::steady_clock::now() + std::chrono::seconds(1); if (m_shutdown_cv.wait_until(lock, timeout_time, [&] { return m_shutdown; })) m_thread.join(); else m_thread.detach(); // timed out, detach it } } void RpcServer::ProcessRpc(StringRef name, std::shared_ptr msg, RpcCallback func, unsigned int conn_id, SendMsgFunc send_response) { std::unique_lock lock(m_mutex); if (func) m_call_queue.emplace(name, msg, func, conn_id, send_response); else m_poll_queue.emplace(name, msg, func, conn_id, send_response); lock.unlock(); if (func) m_call_cond.notify_one(); else m_poll_cond.notify_one(); } bool RpcServer::PollRpc(bool blocking, RpcCallInfo* call_info) { std::unique_lock lock(m_mutex); while (m_poll_queue.empty()) { if (!blocking || m_terminating) return false; m_poll_cond.wait(lock); } auto& item = m_poll_queue.front(); unsigned int call_uid = (item.conn_id << 16) | item.msg->seq_num_uid(); call_info->rpc_id = item.msg->id(); call_info->call_uid = call_uid; call_info->name = std::move(item.name); call_info->params = item.msg->str(); m_response_map.insert(std::make_pair(std::make_pair(item.msg->id(), call_uid), item.send_response)); m_poll_queue.pop(); return true; } void RpcServer::PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, llvm::StringRef result) { auto i = m_response_map.find(std::make_pair(rpc_id, call_uid)); if (i == m_response_map.end()) { WARNING("posting RPC response to nonexistent call (or duplicate response)"); return; } (i->getSecond())(Message::RpcResponse(rpc_id, call_uid, result)); m_response_map.erase(i); } void RpcServer::ThreadMain() { std::unique_lock lock(m_mutex); std::string tmp; while (m_active) { while (m_call_queue.empty()) { m_call_cond.wait(lock); if (!m_active) goto done; } while (!m_call_queue.empty()) { if (!m_active) goto done; auto item = std::move(m_call_queue.front()); m_call_queue.pop(); DEBUG4("rpc calling " << item.name); if (item.name.empty() || !item.msg || !item.func || !item.send_response) continue; // Don't hold mutex during callback execution! lock.unlock(); auto result = item.func(item.name, item.msg->str()); item.send_response(Message::RpcResponse(item.msg->id(), item.msg->seq_num_uid(), result)); lock.lock(); } } done: // use condition variable to signal thread shutdown { std::lock_guard lock(m_shutdown_mutex); m_shutdown = true; m_shutdown_cv.notify_one(); } }