Adds timeout capabilities to blocking Rpc Calls (#86)

This commit is contained in:
Thad House
2016-08-15 20:24:07 -07:00
committed by Peter Johnson
parent bc99d341fb
commit 0b80bd2b09
10 changed files with 83 additions and 3 deletions

View File

@@ -319,12 +319,16 @@ void NT_CreatePolledRpc(const char *name, size_t name_len, const char *def,
size_t def_len);
int NT_PollRpc(int blocking, struct NT_RpcCallInfo* call_info);
int NT_PollRpcTimeout(int blocking, double time_out,
struct NT_RpcCallInfo* call_info);
void NT_PostRpcResponse(unsigned int rpc_id, unsigned int call_uid,
const char *result, size_t result_len);
unsigned int NT_CallRpc(const char *name, size_t name_len, const char *params,
size_t params_len);
char *NT_GetRpcResult(int blocking, unsigned int call_uid, size_t *result_len);
char *NT_GetRpcResultTimeout(int blocking, unsigned int call_uid,
double time_out, size_t *result_len);
char *NT_PackRpcDefinition(const struct NT_RpcDefinition *def,
size_t *packed_len);

View File

@@ -214,6 +214,8 @@ bool NotifierDestroyed();
/*
* Remote Procedure Call Functions
*/
constexpr double kTimeout_Indefinite = -1;
void SetRpcServerOnStart(std::function<void()> on_start);
void SetRpcServerOnExit(std::function<void()> on_exit);
@@ -225,11 +227,14 @@ void CreateRpc(StringRef name, StringRef def, RpcCallback callback);
void CreatePolledRpc(StringRef name, StringRef def);
bool PollRpc(bool blocking, RpcCallInfo* call_info);
bool PollRpc(bool blocking, double time_out, RpcCallInfo* call_info);
void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid,
StringRef result);
unsigned int CallRpc(StringRef name, StringRef params);
bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result);
bool GetRpcResult(bool blocking, unsigned int call_uid, double time_out,
std::string* result);
std::string PackRpcDefinition(const RpcDefinition& def);
bool UnpackRpcDefinition(StringRef packed, RpcDefinition *def);

View File

@@ -90,6 +90,8 @@ NT_SetDefaultEntryRaw @91
NT_SetDefaultEntryBooleanArray @92
NT_SetDefaultEntryDoubleArray @93
NT_SetDefaultEntryStringArray @94
NT_PollRpcTimeout @95
NT_GetRpcResultTimeout @96
; JNI functions
JNI_OnLoad

View File

@@ -90,3 +90,5 @@ NT_SetDefaultEntryRaw @91
NT_SetDefaultEntryBooleanArray @92
NT_SetDefaultEntryDoubleArray @93
NT_SetDefaultEntryStringArray @94
NT_PollRpcTimeout @95
NT_GetRpcResultTimeout @96

View File

@@ -61,10 +61,26 @@ void RpcServer::ProcessRpc(StringRef name, std::shared_ptr<Message> msg,
}
bool RpcServer::PollRpc(bool blocking, RpcCallInfo* call_info) {
return PollRpc(blocking, kTimeout_Indefinite, call_info);
}
bool RpcServer::PollRpc(bool blocking, double time_out, RpcCallInfo* call_info) {
std::unique_lock<std::mutex> lock(m_mutex);
while (m_poll_queue.empty()) {
if (!blocking || m_terminating) return false;
m_poll_cond.wait(lock);
if (time_out < 0) {
m_poll_cond.wait(lock);
} else {
auto timeout_time = std::chrono::steady_clock::now() +
std::chrono::duration<double>(time_out);
while (!m_terminating) {
auto timed_out = m_poll_cond.wait_until(lock, timeout_time);
if (timed_out == std::cv_status::timeout) {
return false;
}
}
}
if (m_terminating) return false;
}
auto& item = m_poll_queue.front();

View File

@@ -44,6 +44,7 @@ class RpcServer {
SendMsgFunc send_response);
bool PollRpc(bool blocking, RpcCallInfo* call_info);
bool PollRpc(bool blocking, double time_out, RpcCallInfo* call_info);
void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid,
llvm::StringRef result);

View File

@@ -1407,7 +1407,12 @@ unsigned int Storage::CallRpc(StringRef name, StringRef params) {
return combined_uid;
}
bool Storage::GetRpcResult(bool blocking, unsigned int call_uid,
bool Storage::GetRpcResult(bool blocking, unsigned int call_uid,
std::string* result) {
return GetRpcResult(blocking, call_uid, -1, result);
}
bool Storage::GetRpcResult(bool blocking, unsigned int call_uid, double time_out,
std::string* result) {
std::unique_lock<std::mutex> lock(m_mutex);
for (;;) {
@@ -1415,7 +1420,18 @@ bool Storage::GetRpcResult(bool blocking, unsigned int call_uid,
m_rpc_results.find(std::make_pair(call_uid >> 16, call_uid & 0xffff));
if (i == m_rpc_results.end()) {
if (!blocking || m_terminating) return false;
m_rpc_results_cond.wait(lock);
if (time_out < 0) {
m_rpc_results_cond.wait(lock);
} else {
auto timeout_time = std::chrono::steady_clock::now() +
std::chrono::duration<double>(time_out);
while (!m_terminating) {
auto timed_out = m_rpc_results_cond.wait_until(lock, timeout_time);
if (timed_out == std::cv_status::timeout) {
return false;
}
}
}
if (m_terminating) return false;
continue;
}

View File

@@ -99,6 +99,8 @@ class Storage {
unsigned int CallRpc(StringRef name, StringRef params);
bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result);
bool GetRpcResult(bool blocking, unsigned int call_uid, double time_out,
std::string* result);
private:
Storage();

View File

@@ -257,6 +257,15 @@ int NT_PollRpc(int blocking, NT_RpcCallInfo* call_info) {
return 1;
}
int NT_PollRpcTimeout(int blocking, double time_out,
NT_RpcCallInfo* call_info) {
RpcCallInfo call_info_cpp;
if (!nt::PollRpc(blocking != 0, time_out, &call_info_cpp))
return 0;
ConvertToC(call_info_cpp, call_info);
return 1;
}
void NT_PostRpcResponse(unsigned int rpc_id, unsigned int call_uid,
const char *result, size_t result_len) {
nt::PostRpcResponse(rpc_id, call_uid, StringRef(result, result_len));
@@ -278,6 +287,19 @@ char *NT_GetRpcResult(int blocking, unsigned int call_uid, size_t *result_len) {
return result_cstr;
}
char *NT_GetRpcResultTimeout(int blocking, unsigned int call_uid,
double time_out, size_t *result_len) {
std::string result;
if (!nt::GetRpcResult(blocking != 0, call_uid, time_out, &result))
return nullptr;
// convert result
*result_len = result.size();
char *result_cstr;
ConvertToC(result, &result_cstr);
return result_cstr;
}
char *NT_PackRpcDefinition(const NT_RpcDefinition *def, size_t *packed_len) {
auto packed = nt::PackRpcDefinition(ConvertFromC(*def));

View File

@@ -128,6 +128,10 @@ bool PollRpc(bool blocking, RpcCallInfo* call_info) {
return RpcServer::GetInstance().PollRpc(blocking, call_info);
}
bool PollRpc(bool blocking, double time_out, RpcCallInfo* call_info) {
return RpcServer::GetInstance().PollRpc(blocking, time_out, call_info);
}
void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid,
StringRef result) {
RpcServer::GetInstance().PostRpcResponse(rpc_id, call_uid, result);
@@ -141,6 +145,12 @@ bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result) {
return Storage::GetInstance().GetRpcResult(blocking, call_uid, result);
}
bool GetRpcResult(bool blocking, unsigned int call_uid,
double time_out, std::string* result) {
return Storage::GetInstance().GetRpcResult(blocking, call_uid, time_out,
result);
}
std::string PackRpcDefinition(const RpcDefinition& def) {
WireEncoder enc(0x0300);
enc.Write8(def.version);