From 0b80bd2b09d64383b4d32f77ffec51e46409fb57 Mon Sep 17 00:00:00 2001 From: Thad House Date: Mon, 15 Aug 2016 20:24:07 -0700 Subject: [PATCH] Adds timeout capabilities to blocking Rpc Calls (#86) --- include/ntcore_c.h | 4 ++++ include/ntcore_cpp.h | 5 +++++ ntcore-jni.def | 2 ++ ntcore.def | 2 ++ src/RpcServer.cpp | 18 +++++++++++++++++- src/RpcServer.h | 1 + src/Storage.cpp | 20 ++++++++++++++++++-- src/Storage.h | 2 ++ src/ntcore_c.cpp | 22 ++++++++++++++++++++++ src/ntcore_cpp.cpp | 10 ++++++++++ 10 files changed, 83 insertions(+), 3 deletions(-) diff --git a/include/ntcore_c.h b/include/ntcore_c.h index 19d009e895..d16afb7825 100644 --- a/include/ntcore_c.h +++ b/include/ntcore_c.h @@ -319,12 +319,16 @@ void NT_CreatePolledRpc(const char *name, size_t name_len, const char *def, size_t def_len); int NT_PollRpc(int blocking, struct NT_RpcCallInfo* call_info); +int NT_PollRpcTimeout(int blocking, double time_out, + struct NT_RpcCallInfo* call_info); void NT_PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, const char *result, size_t result_len); unsigned int NT_CallRpc(const char *name, size_t name_len, const char *params, size_t params_len); char *NT_GetRpcResult(int blocking, unsigned int call_uid, size_t *result_len); +char *NT_GetRpcResultTimeout(int blocking, unsigned int call_uid, + double time_out, size_t *result_len); char *NT_PackRpcDefinition(const struct NT_RpcDefinition *def, size_t *packed_len); diff --git a/include/ntcore_cpp.h b/include/ntcore_cpp.h index 5badbbcf24..cdded65b87 100644 --- a/include/ntcore_cpp.h +++ b/include/ntcore_cpp.h @@ -214,6 +214,8 @@ bool NotifierDestroyed(); /* * Remote Procedure Call Functions */ + +constexpr double kTimeout_Indefinite = -1; void SetRpcServerOnStart(std::function on_start); void SetRpcServerOnExit(std::function on_exit); @@ -225,11 +227,14 @@ void CreateRpc(StringRef name, StringRef def, RpcCallback callback); void CreatePolledRpc(StringRef name, StringRef def); bool PollRpc(bool blocking, RpcCallInfo* call_info); +bool PollRpc(bool blocking, double time_out, RpcCallInfo* call_info); void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, StringRef result); unsigned int CallRpc(StringRef name, StringRef params); bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result); +bool GetRpcResult(bool blocking, unsigned int call_uid, double time_out, + std::string* result); std::string PackRpcDefinition(const RpcDefinition& def); bool UnpackRpcDefinition(StringRef packed, RpcDefinition *def); diff --git a/ntcore-jni.def b/ntcore-jni.def index 821120186a..c91b2fccca 100644 --- a/ntcore-jni.def +++ b/ntcore-jni.def @@ -90,6 +90,8 @@ NT_SetDefaultEntryRaw @91 NT_SetDefaultEntryBooleanArray @92 NT_SetDefaultEntryDoubleArray @93 NT_SetDefaultEntryStringArray @94 +NT_PollRpcTimeout @95 +NT_GetRpcResultTimeout @96 ; JNI functions JNI_OnLoad diff --git a/ntcore.def b/ntcore.def index 6ba2ae3ae2..f5d1549445 100644 --- a/ntcore.def +++ b/ntcore.def @@ -90,3 +90,5 @@ NT_SetDefaultEntryRaw @91 NT_SetDefaultEntryBooleanArray @92 NT_SetDefaultEntryDoubleArray @93 NT_SetDefaultEntryStringArray @94 +NT_PollRpcTimeout @95 +NT_GetRpcResultTimeout @96 diff --git a/src/RpcServer.cpp b/src/RpcServer.cpp index f43634364b..dc653615a9 100644 --- a/src/RpcServer.cpp +++ b/src/RpcServer.cpp @@ -61,10 +61,26 @@ void RpcServer::ProcessRpc(StringRef name, std::shared_ptr msg, } bool RpcServer::PollRpc(bool blocking, RpcCallInfo* call_info) { + return PollRpc(blocking, kTimeout_Indefinite, call_info); +} + +bool RpcServer::PollRpc(bool blocking, double time_out, RpcCallInfo* call_info) { std::unique_lock lock(m_mutex); while (m_poll_queue.empty()) { if (!blocking || m_terminating) return false; - m_poll_cond.wait(lock); + if (time_out < 0) { + m_poll_cond.wait(lock); + } else { + auto timeout_time = std::chrono::steady_clock::now() + + std::chrono::duration(time_out); + while (!m_terminating) { + auto timed_out = m_poll_cond.wait_until(lock, timeout_time); + if (timed_out == std::cv_status::timeout) { + return false; + } + } + } + if (m_terminating) return false; } auto& item = m_poll_queue.front(); diff --git a/src/RpcServer.h b/src/RpcServer.h index 9a6eedfac9..d12bb6ad8c 100644 --- a/src/RpcServer.h +++ b/src/RpcServer.h @@ -44,6 +44,7 @@ class RpcServer { SendMsgFunc send_response); bool PollRpc(bool blocking, RpcCallInfo* call_info); + bool PollRpc(bool blocking, double time_out, RpcCallInfo* call_info); void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, llvm::StringRef result); diff --git a/src/Storage.cpp b/src/Storage.cpp index 4453e03abf..c68430b39f 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -1407,7 +1407,12 @@ unsigned int Storage::CallRpc(StringRef name, StringRef params) { return combined_uid; } -bool Storage::GetRpcResult(bool blocking, unsigned int call_uid, +bool Storage::GetRpcResult(bool blocking, unsigned int call_uid, + std::string* result) { + return GetRpcResult(blocking, call_uid, -1, result); +} + +bool Storage::GetRpcResult(bool blocking, unsigned int call_uid, double time_out, std::string* result) { std::unique_lock lock(m_mutex); for (;;) { @@ -1415,7 +1420,18 @@ bool Storage::GetRpcResult(bool blocking, unsigned int call_uid, m_rpc_results.find(std::make_pair(call_uid >> 16, call_uid & 0xffff)); if (i == m_rpc_results.end()) { if (!blocking || m_terminating) return false; - m_rpc_results_cond.wait(lock); + if (time_out < 0) { + m_rpc_results_cond.wait(lock); + } else { + auto timeout_time = std::chrono::steady_clock::now() + + std::chrono::duration(time_out); + while (!m_terminating) { + auto timed_out = m_rpc_results_cond.wait_until(lock, timeout_time); + if (timed_out == std::cv_status::timeout) { + return false; + } + } + } if (m_terminating) return false; continue; } diff --git a/src/Storage.h b/src/Storage.h index e317c2c668..46362f5824 100644 --- a/src/Storage.h +++ b/src/Storage.h @@ -99,6 +99,8 @@ class Storage { unsigned int CallRpc(StringRef name, StringRef params); bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result); + bool GetRpcResult(bool blocking, unsigned int call_uid, double time_out, + std::string* result); private: Storage(); diff --git a/src/ntcore_c.cpp b/src/ntcore_c.cpp index 00e609166a..09cf3b480a 100644 --- a/src/ntcore_c.cpp +++ b/src/ntcore_c.cpp @@ -257,6 +257,15 @@ int NT_PollRpc(int blocking, NT_RpcCallInfo* call_info) { return 1; } +int NT_PollRpcTimeout(int blocking, double time_out, + NT_RpcCallInfo* call_info) { + RpcCallInfo call_info_cpp; + if (!nt::PollRpc(blocking != 0, time_out, &call_info_cpp)) + return 0; + ConvertToC(call_info_cpp, call_info); + return 1; +} + void NT_PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, const char *result, size_t result_len) { nt::PostRpcResponse(rpc_id, call_uid, StringRef(result, result_len)); @@ -278,6 +287,19 @@ char *NT_GetRpcResult(int blocking, unsigned int call_uid, size_t *result_len) { return result_cstr; } +char *NT_GetRpcResultTimeout(int blocking, unsigned int call_uid, + double time_out, size_t *result_len) { + std::string result; + if (!nt::GetRpcResult(blocking != 0, call_uid, time_out, &result)) + return nullptr; + + // convert result + *result_len = result.size(); + char *result_cstr; + ConvertToC(result, &result_cstr); + return result_cstr; +} + char *NT_PackRpcDefinition(const NT_RpcDefinition *def, size_t *packed_len) { auto packed = nt::PackRpcDefinition(ConvertFromC(*def)); diff --git a/src/ntcore_cpp.cpp b/src/ntcore_cpp.cpp index af66b21cce..976f83bf02 100644 --- a/src/ntcore_cpp.cpp +++ b/src/ntcore_cpp.cpp @@ -128,6 +128,10 @@ bool PollRpc(bool blocking, RpcCallInfo* call_info) { return RpcServer::GetInstance().PollRpc(blocking, call_info); } +bool PollRpc(bool blocking, double time_out, RpcCallInfo* call_info) { + return RpcServer::GetInstance().PollRpc(blocking, time_out, call_info); +} + void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, StringRef result) { RpcServer::GetInstance().PostRpcResponse(rpc_id, call_uid, result); @@ -141,6 +145,12 @@ bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result) { return Storage::GetInstance().GetRpcResult(blocking, call_uid, result); } +bool GetRpcResult(bool blocking, unsigned int call_uid, + double time_out, std::string* result) { + return Storage::GetInstance().GetRpcResult(blocking, call_uid, time_out, + result); +} + std::string PackRpcDefinition(const RpcDefinition& def) { WireEncoder enc(0x0300); enc.Write8(def.version);