Adds ConnectionInfo to Rpc callback (#116)

This commit is contained in:
Thad House
2016-10-03 10:04:30 -07:00
committed by Peter Johnson
parent 9047c98e68
commit a7eca7d4bd
8 changed files with 48 additions and 15 deletions

View File

@@ -311,7 +311,8 @@ void NT_SetRpcServerOnExit(void (*on_exit)(void *data), void *data);
typedef char *(*NT_RpcCallback)(void *data, const char *name, size_t name_len,
const char *params, size_t params_len,
size_t *results_len);
size_t *results_len,
const struct NT_ConnectionInfo* conn_info);
void NT_CreateRpc(const char *name, size_t name_len, const char *def,
size_t def_len, void *data, NT_RpcCallback callback);

View File

@@ -224,7 +224,8 @@ constexpr double kTimeout_Indefinite = -1;
void SetRpcServerOnStart(std::function<void()> on_start);
void SetRpcServerOnExit(std::function<void()> on_exit);
typedef std::function<std::string(StringRef name, StringRef params)>
typedef std::function<std::string(StringRef name, StringRef params,
const ConnectionInfo& conn_info)>
RpcCallback;
void CreateRpc(StringRef name, StringRef def, RpcCallback callback);

View File

@@ -47,15 +47,17 @@ void RpcServer::Stop() { m_owner.Stop(); }
void RpcServer::ProcessRpc(StringRef name, std::shared_ptr<Message> msg,
RpcCallback func, unsigned int conn_id,
SendMsgFunc send_response) {
SendMsgFunc send_response,
const ConnectionInfo& conn_info) {
if (func) {
auto thr = m_owner.GetThread();
if (!thr) return;
thr->m_call_queue.emplace(name, msg, func, conn_id, send_response);
thr->m_call_queue.emplace(name, msg, func, conn_id, send_response,
conn_info);
thr->m_cond.notify_one();
} else {
std::lock_guard<std::mutex> lock(m_mutex);
m_poll_queue.emplace(name, msg, func, conn_id, send_response);
m_poll_queue.emplace(name, msg, func, conn_id, send_response, conn_info);
m_poll_cond.notify_one();
}
}
@@ -138,7 +140,7 @@ void RpcServer::Thread::Main() {
// Don't hold mutex during callback execution!
lock.unlock();
auto result = item.func(item.name, item.msg->str());
auto result = item.func(item.name, item.msg->str(), item.conn_info);
item.send_response(Message::RpcResponse(item.msg->id(),
item.msg->seq_num_uid(), result));
lock.lock();

View File

@@ -41,7 +41,8 @@ class RpcServer {
void ProcessRpc(StringRef name, std::shared_ptr<Message> msg,
RpcCallback func, unsigned int conn_id,
SendMsgFunc send_response);
SendMsgFunc send_response,
const ConnectionInfo& conn_info);
bool PollRpc(bool blocking, RpcCallInfo* call_info);
bool PollRpc(bool blocking, double time_out, RpcCallInfo* call_info);
@@ -56,18 +57,21 @@ class RpcServer {
struct RpcCall {
RpcCall(StringRef name_, std::shared_ptr<Message> msg_, RpcCallback func_,
unsigned int conn_id_, SendMsgFunc send_response_)
unsigned int conn_id_, SendMsgFunc send_response_,
const ConnectionInfo conn_info_)
: name(name_),
msg(msg_),
func(func_),
conn_id(conn_id_),
send_response(send_response_) {}
send_response(send_response_),
conn_info(conn_info_) {}
std::string name;
std::shared_ptr<Message> msg;
RpcCallback func;
unsigned int conn_id;
SendMsgFunc send_response;
ConnectionInfo conn_info;
};
std::mutex m_mutex;

View File

@@ -13,6 +13,7 @@
#include "llvm/StringExtras.h"
#include "support/Base64.h"
#include "support/timestamp.h"
#include "Log.h"
#include "NetworkConnection.h"
@@ -346,11 +347,22 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
DEBUG("received RPC call to non-RPC entry");
return;
}
ConnectionInfo conn_info;
auto c = conn_weak.lock();
if (c) {
conn_info = c->info();
} else {
conn_info.remote_id = "";
conn_info.remote_ip = "";
conn_info.remote_port = 0;
conn_info.last_update = 0;
conn_info.protocol_version = 0;
}
m_rpc_server.ProcessRpc(entry->name, msg, entry->rpc_callback,
conn->uid(), [=](std::shared_ptr<Message> msg) {
auto c = conn_weak.lock();
if (c) c->QueueOutgoing(msg);
});
}, conn_info);
break;
}
case Message::kRpcResponse: {
@@ -1392,13 +1404,19 @@ unsigned int Storage::CallRpc(StringRef name, StringRef params) {
// gracefully anyway.
auto rpc_callback = entry->rpc_callback;
lock.unlock();
ConnectionInfo conn_info;
conn_info.remote_id = "Server";
conn_info.remote_ip = "localhost";
conn_info.remote_port = 0;
conn_info.last_update = wpi::Now();
conn_info.protocol_version = 0x0300;
m_rpc_server.ProcessRpc(
name, msg, rpc_callback, 0xffffU, [this](std::shared_ptr<Message> msg) {
std::lock_guard<std::mutex> lock(m_mutex);
m_rpc_results.insert(std::make_pair(
std::make_pair(msg->id(), msg->seq_num_uid()), msg->str()));
m_rpc_results_cond.notify_all();
});
}, conn_info);
} else {
auto queue_outgoing = m_queue_outgoing;
lock.unlock();

View File

@@ -234,12 +234,17 @@ void NT_CreateRpc(const char *name, size_t name_len, const char *def,
size_t def_len, void *data, NT_RpcCallback callback) {
nt::CreateRpc(
StringRef(name, name_len), StringRef(def, def_len),
[=](StringRef name, StringRef params) -> std::string {
[=](StringRef name, StringRef params,
const ConnectionInfo& conn_info) -> std::string {
size_t results_len;
NT_ConnectionInfo conn_c;
ConvertToC(conn_info, &conn_c);
char* results_c = callback(data, name.data(), name.size(),
params.data(), params.size(), &results_len);
params.data(), params.size(), &results_len,
&conn_c);
std::string results(results_c, results_len);
std::free(results_c);
DisposeConnectionInfo(&conn_c);
return results;
});
}

View File

@@ -4,7 +4,8 @@
#include "ntcore.h"
std::string callback1(nt::StringRef name, nt::StringRef params_str) {
std::string callback1(nt::StringRef name, nt::StringRef params_str,
const ConnectionInfo& conn_info) {
auto params = nt::UnpackRpcValues(params_str, NT_DOUBLE);
if (params.empty()) {
std::fputs("empty params?\n", stderr);

View File

@@ -5,7 +5,8 @@
#include "ntcore.h"
std::string callback1(nt::StringRef name, nt::StringRef params_str) {
std::string callback1(nt::StringRef name, nt::StringRef params_str,
const ConnectionInfo& conn_info) {
auto params = nt::UnpackRpcValues(params_str, NT_DOUBLE);
if (params.empty()) {
std::fputs("empty params?\n", stderr);