diff --git a/include/ntcore_c.h b/include/ntcore_c.h index d232230b2f..ced4bcc69f 100644 --- a/include/ntcore_c.h +++ b/include/ntcore_c.h @@ -311,7 +311,8 @@ void NT_SetRpcServerOnExit(void (*on_exit)(void *data), void *data); typedef char *(*NT_RpcCallback)(void *data, const char *name, size_t name_len, const char *params, size_t params_len, - size_t *results_len); + size_t *results_len, + const struct NT_ConnectionInfo* conn_info); void NT_CreateRpc(const char *name, size_t name_len, const char *def, size_t def_len, void *data, NT_RpcCallback callback); diff --git a/include/ntcore_cpp.h b/include/ntcore_cpp.h index 3376c90290..6bdf4dfd84 100644 --- a/include/ntcore_cpp.h +++ b/include/ntcore_cpp.h @@ -224,7 +224,8 @@ constexpr double kTimeout_Indefinite = -1; void SetRpcServerOnStart(std::function on_start); void SetRpcServerOnExit(std::function on_exit); -typedef std::function +typedef std::function RpcCallback; void CreateRpc(StringRef name, StringRef def, RpcCallback callback); diff --git a/src/RpcServer.cpp b/src/RpcServer.cpp index b8b40df619..7857bbec86 100644 --- a/src/RpcServer.cpp +++ b/src/RpcServer.cpp @@ -47,15 +47,17 @@ void RpcServer::Stop() { m_owner.Stop(); } void RpcServer::ProcessRpc(StringRef name, std::shared_ptr msg, RpcCallback func, unsigned int conn_id, - SendMsgFunc send_response) { + SendMsgFunc send_response, + const ConnectionInfo& conn_info) { if (func) { auto thr = m_owner.GetThread(); if (!thr) return; - thr->m_call_queue.emplace(name, msg, func, conn_id, send_response); + thr->m_call_queue.emplace(name, msg, func, conn_id, send_response, + conn_info); thr->m_cond.notify_one(); } else { std::lock_guard lock(m_mutex); - m_poll_queue.emplace(name, msg, func, conn_id, send_response); + m_poll_queue.emplace(name, msg, func, conn_id, send_response, conn_info); m_poll_cond.notify_one(); } } @@ -138,7 +140,7 @@ void RpcServer::Thread::Main() { // Don't hold mutex during callback execution! lock.unlock(); - auto result = item.func(item.name, item.msg->str()); + auto result = item.func(item.name, item.msg->str(), item.conn_info); item.send_response(Message::RpcResponse(item.msg->id(), item.msg->seq_num_uid(), result)); lock.lock(); diff --git a/src/RpcServer.h b/src/RpcServer.h index d12bb6ad8c..04a0d333fe 100644 --- a/src/RpcServer.h +++ b/src/RpcServer.h @@ -41,7 +41,8 @@ class RpcServer { void ProcessRpc(StringRef name, std::shared_ptr msg, RpcCallback func, unsigned int conn_id, - SendMsgFunc send_response); + SendMsgFunc send_response, + const ConnectionInfo& conn_info); bool PollRpc(bool blocking, RpcCallInfo* call_info); bool PollRpc(bool blocking, double time_out, RpcCallInfo* call_info); @@ -56,18 +57,21 @@ class RpcServer { struct RpcCall { RpcCall(StringRef name_, std::shared_ptr msg_, RpcCallback func_, - unsigned int conn_id_, SendMsgFunc send_response_) + unsigned int conn_id_, SendMsgFunc send_response_, + const ConnectionInfo conn_info_) : name(name_), msg(msg_), func(func_), conn_id(conn_id_), - send_response(send_response_) {} + send_response(send_response_), + conn_info(conn_info_) {} std::string name; std::shared_ptr msg; RpcCallback func; unsigned int conn_id; SendMsgFunc send_response; + ConnectionInfo conn_info; }; std::mutex m_mutex; diff --git a/src/Storage.cpp b/src/Storage.cpp index f4a9ec51b1..66dc21dc03 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -13,6 +13,7 @@ #include "llvm/StringExtras.h" #include "support/Base64.h" +#include "support/timestamp.h" #include "Log.h" #include "NetworkConnection.h" @@ -346,11 +347,22 @@ void Storage::ProcessIncoming(std::shared_ptr msg, DEBUG("received RPC call to non-RPC entry"); return; } + ConnectionInfo conn_info; + auto c = conn_weak.lock(); + if (c) { + conn_info = c->info(); + } else { + conn_info.remote_id = ""; + conn_info.remote_ip = ""; + conn_info.remote_port = 0; + conn_info.last_update = 0; + conn_info.protocol_version = 0; + } m_rpc_server.ProcessRpc(entry->name, msg, entry->rpc_callback, conn->uid(), [=](std::shared_ptr msg) { auto c = conn_weak.lock(); if (c) c->QueueOutgoing(msg); - }); + }, conn_info); break; } case Message::kRpcResponse: { @@ -1392,13 +1404,19 @@ unsigned int Storage::CallRpc(StringRef name, StringRef params) { // gracefully anyway. auto rpc_callback = entry->rpc_callback; lock.unlock(); + ConnectionInfo conn_info; + conn_info.remote_id = "Server"; + conn_info.remote_ip = "localhost"; + conn_info.remote_port = 0; + conn_info.last_update = wpi::Now(); + conn_info.protocol_version = 0x0300; m_rpc_server.ProcessRpc( name, msg, rpc_callback, 0xffffU, [this](std::shared_ptr msg) { std::lock_guard lock(m_mutex); m_rpc_results.insert(std::make_pair( std::make_pair(msg->id(), msg->seq_num_uid()), msg->str())); m_rpc_results_cond.notify_all(); - }); + }, conn_info); } else { auto queue_outgoing = m_queue_outgoing; lock.unlock(); diff --git a/src/ntcore_c.cpp b/src/ntcore_c.cpp index d7d2cb2018..42a1a47287 100644 --- a/src/ntcore_c.cpp +++ b/src/ntcore_c.cpp @@ -234,12 +234,17 @@ void NT_CreateRpc(const char *name, size_t name_len, const char *def, size_t def_len, void *data, NT_RpcCallback callback) { nt::CreateRpc( StringRef(name, name_len), StringRef(def, def_len), - [=](StringRef name, StringRef params) -> std::string { + [=](StringRef name, StringRef params, + const ConnectionInfo& conn_info) -> std::string { size_t results_len; + NT_ConnectionInfo conn_c; + ConvertToC(conn_info, &conn_c); char* results_c = callback(data, name.data(), name.size(), - params.data(), params.size(), &results_len); + params.data(), params.size(), &results_len, + &conn_c); std::string results(results_c, results_len); std::free(results_c); + DisposeConnectionInfo(&conn_c); return results; }); } diff --git a/test/rpc_local.cpp b/test/rpc_local.cpp index 20e776ede4..cda6e07051 100644 --- a/test/rpc_local.cpp +++ b/test/rpc_local.cpp @@ -4,7 +4,8 @@ #include "ntcore.h" -std::string callback1(nt::StringRef name, nt::StringRef params_str) { +std::string callback1(nt::StringRef name, nt::StringRef params_str, + const ConnectionInfo& conn_info) { auto params = nt::UnpackRpcValues(params_str, NT_DOUBLE); if (params.empty()) { std::fputs("empty params?\n", stderr); diff --git a/test/rpc_speed.cpp b/test/rpc_speed.cpp index 923df3d462..b8534972b2 100644 --- a/test/rpc_speed.cpp +++ b/test/rpc_speed.cpp @@ -5,7 +5,8 @@ #include "ntcore.h" -std::string callback1(nt::StringRef name, nt::StringRef params_str) { +std::string callback1(nt::StringRef name, nt::StringRef params_str, + const ConnectionInfo& conn_info) { auto params = nt::UnpackRpcValues(params_str, NT_DOUBLE); if (params.empty()) { std::fputs("empty params?\n", stderr);