diff --git a/include/ntcore_c.h b/include/ntcore_c.h index 6a6493ea9f..3dff25e7af 100644 --- a/include/ntcore_c.h +++ b/include/ntcore_c.h @@ -137,6 +137,14 @@ struct NT_RpcDefinition { NT_RpcResultDef *results; }; +/** NetworkTables RPC Call Data */ +struct NT_RpcCallInfo { + unsigned int rpc_id; + unsigned int call_uid; + struct NT_String name; + struct NT_String params; +}; + /* * Table Functions */ @@ -268,18 +276,31 @@ void NT_RemoveConnectionListener(unsigned int conn_listener_uid); * Remote Procedure Call Functions */ -typedef NT_Value **(*NT_RpcCallback)(unsigned int uid, void *data, - const char *name, size_t name_len, - const struct NT_Value **params, - size_t params_len, size_t *results_len); +typedef char *(*NT_RpcCallback)(void *data, const char *name, size_t name_len, + const char *params, size_t params_len, + size_t *results_len); -unsigned int NT_CreateRpc(const char *name, size_t name_len, - const struct NT_RpcDefinition *def, void *data, - NT_RpcCallback callback); -void NT_DeleteRpc(unsigned int rpc_uid); -unsigned int NT_CallRpc(const char *name, size_t name_len, - const struct NT_Value **params, size_t params_len); -struct NT_Value **NT_GetRpcResult(unsigned int result_uid, size_t *results_len); +void NT_CreateRpc(const char *name, size_t name_len, const char *def, + size_t def_len, void *data, NT_RpcCallback callback); +void NT_CreatePolledRpc(const char *name, size_t name_len, const char *def, + size_t def_len); + +int NT_PollRpc(int blocking, struct NT_RpcCallInfo* call_info); +void NT_PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, + const char *result, size_t result_len); + +unsigned int NT_CallRpc(const char *name, size_t name_len, const char *params, + size_t params_len); +char *NT_GetRpcResult(int blocking, unsigned int call_uid, size_t *result_len); + +char *NT_PackRpcDefinition(const struct NT_RpcDefinition *def, + size_t *packed_len); +int NT_UnpackRpcDefinition(const char *packed, size_t packed_len, + struct NT_RpcDefinition *def); +char *NT_PackRpcValues(const struct NT_Value **values, size_t values_len, + size_t *packed_len); +struct NT_Value **NT_UnpackRpcValues(const char *packed, size_t packed_len, + const NT_Type *types, size_t types_len); /* * Client/Server Functions @@ -319,6 +340,10 @@ void NT_InitString(struct NT_String *str); void NT_DisposeConnectionInfoArray(struct NT_ConnectionInfo *arr, size_t count); +void NT_DisposeRpcDefinition(struct NT_RpcDefinition *def); + +void NT_DisposeRpcCallInfo(struct NT_RpcCallInfo *call_info); + /* timestamp */ unsigned long long NT_Now(void); diff --git a/include/ntcore_cpp.h b/include/ntcore_cpp.h index 0ca56025ef..e913f45250 100644 --- a/include/ntcore_cpp.h +++ b/include/ntcore_cpp.h @@ -50,12 +50,19 @@ struct ConnectionInfo { /** NetworkTables RPC Parameter Definition */ struct RpcParamDef { + RpcParamDef() = default; + RpcParamDef(StringRef name_, std::shared_ptr def_value_) + : name(name_), def_value(def_value_) {} + std::string name; std::shared_ptr def_value; }; /** NetworkTables RPC Result Definition */ struct RpcResultDef { + RpcResultDef() = default; + RpcResultDef(StringRef name_, NT_Type type_) : name(name_), type(type_) {} + std::string name; NT_Type type; }; @@ -68,6 +75,14 @@ struct RpcDefinition { std::vector results; }; +/** NetworkTables RPC Call Data */ +struct RpcCallInfo { + unsigned int rpc_id; + unsigned int call_uid; + std::string name; + std::string params; +}; + /* * Table Functions */ @@ -183,15 +198,24 @@ void RemoveConnectionListener(unsigned int conn_listener_uid); * Remote Procedure Call Functions */ -typedef std::function>( - unsigned int uid, StringRef name, ArrayRef> params)> +typedef std::function RpcCallback; -unsigned int CreateRpc(StringRef name, const RpcDefinition& def, - RpcCallback callback); -void DeleteRpc(unsigned int rpc_uid); -unsigned int CallRpc(StringRef name, ArrayRef> params); -std::vector> GetRpcResult(unsigned int result_uid); +void CreateRpc(StringRef name, StringRef def, RpcCallback callback); +void CreatePolledRpc(StringRef name, StringRef def); + +bool PollRpc(bool blocking, RpcCallInfo* call_info); +void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, + StringRef result); + +unsigned int CallRpc(StringRef name, StringRef params); +bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result); + +std::string PackRpcDefinition(const RpcDefinition& def); +bool UnpackRpcDefinition(StringRef packed, RpcDefinition *def); +std::string PackRpcValues(ArrayRef> values); +std::vector> UnpackRpcValues(StringRef packed, + ArrayRef types); /* * Client/Server Functions diff --git a/ntcore.def b/ntcore.def index 87c1e314a3..e880d18610 100644 --- a/ntcore.def +++ b/ntcore.def @@ -13,10 +13,6 @@ NT_AddEntryListener @10 NT_RemoveEntryListener @11 NT_AddConnectionListener @12 NT_RemoveConnectionListener @13 -NT_CreateRpc @14 -NT_DeleteRpc @15 -NT_CallRpc @16 -NT_GetRpcResult @17 NT_SetNetworkIdentity @18 NT_StartServer @19 NT_StopServer @20 @@ -33,4 +29,15 @@ NT_InitString @30 NT_DisposeConnectionInfoArray @31 NT_Now @32 NT_SetLogger @33 - +NT_CreateRpc @34 +NT_CreatePolledRpc @35 +NT_PollRpc @36 +NT_PostRpcRepsonse @37 +NT_CallRpc @38 +NT_GetRpcResult @39 +NT_PackRpcDefinition @40 +NT_UnpackRpcDefinition @41 +NT_PackRpcValues @42 +NT_UnpackRpcValues @43 +NT_DisposeRpcDefinition @44 +NT_DisposeRpcCallInfo @45 diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 151ffa3827..7cd1063665 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -94,7 +94,7 @@ void DispatcherBase::Stop() { if (m_dispatch_thread.joinable()) m_dispatch_thread.join(); if (m_clientserver_thread.joinable()) m_clientserver_thread.join(); - std::vector conns; + std::vector> conns; { std::lock_guard lock(m_user_mutex); conns.swap(m_connections); @@ -135,21 +135,14 @@ std::vector DispatcherBase::GetConnections() const { std::lock_guard lock(m_user_mutex); for (auto& conn : m_connections) { - if (conn.net->state() != NetworkConnection::kActive) continue; - conns.emplace_back(conn.net->info()); + if (conn->state() != NetworkConnection::kActive) continue; + conns.emplace_back(conn->info()); } return conns; } void DispatcherBase::DispatchThreadMain() { - // local copy of active m_connections - struct ConnectionRef { - NetworkConnection* net; - NetworkConnection::Outgoing outgoing; - }; - std::vector connections; - auto timeout_time = std::chrono::steady_clock::now(); int count = 0; std::unique_lock flush_lock(m_flush_mutex); @@ -171,19 +164,15 @@ void DispatcherBase::DispatchThreadMain() { count = 0; } - // make a local copy of the connections list (so we don't hold the lock) - connections.resize(0); { std::lock_guard user_lock(m_user_mutex); bool reconnect = false; for (auto& conn : m_connections) { - if (conn.net->state() == NetworkConnection::kActive) { - connections.push_back(ConnectionRef()); - connections.back().net = conn.net.get(); - connections.back().outgoing.swap(conn.outgoing); - conn.last_update.resize(0); // clear "previous" updates - } - if (!m_server && conn.net->state() == NetworkConnection::kDead) + // post outgoing messages if connection is active + if (conn->state() == NetworkConnection::kActive) conn->PostOutgoing(); + + // if client, reconnect if connection died + if (!m_server && conn->state() == NetworkConnection::kDead) reconnect = true; } // reconnect if we disconnected (and a reconnect is not in progress) @@ -192,105 +181,6 @@ void DispatcherBase::DispatchThreadMain() { m_reconnect_cv.notify_one(); } } - - // send outgoing messages - for (auto& conn : connections) { - if (!conn.outgoing.empty()) - conn.net->outgoing().emplace(std::move(conn.outgoing)); - } - } -} - -void DispatcherBase::Connection::QueueOutgoing(std::shared_ptr msg) { - // Merge with previous. One case we don't combine: delete/assign loop. - switch (msg->type()) { - case Message::kEntryAssign: - case Message::kEntryUpdate: { - // don't do this for unassigned id's - unsigned int id = msg->id(); - if (id == 0xffff) { - outgoing.push_back(msg); - break; - } - if (id < last_update.size() && last_update[id].first != 0) { - // overwrite the previous one for this id - auto& oldmsg = outgoing[last_update[id].first - 1]; - if (oldmsg && oldmsg->Is(Message::kEntryAssign) && - msg->Is(Message::kEntryUpdate)) { - // need to update assignment with new seq_num and value - oldmsg = Message::EntryAssign(oldmsg->str(), id, msg->seq_num_uid(), - msg->value(), oldmsg->flags()); - } else - oldmsg = msg; // easy update - } else { - // new, but remember it - std::size_t pos = outgoing.size(); - outgoing.push_back(msg); - if (id >= last_update.size()) last_update.resize(id + 1); - last_update[id].first = pos + 1; - } - break; - } - case Message::kEntryDelete: { - // don't do this for unassigned id's - unsigned int id = msg->id(); - if (id == 0xffff) { - outgoing.push_back(msg); - break; - } - - // clear previous updates - if (id < last_update.size()) { - if (last_update[id].first != 0) { - outgoing[last_update[id].first - 1].reset(); - last_update[id].first = 0; - } - if (last_update[id].second != 0) { - outgoing[last_update[id].second - 1].reset(); - last_update[id].second = 0; - } - } - - // add deletion - outgoing.push_back(msg); - break; - } - case Message::kFlagsUpdate: { - // don't do this for unassigned id's - unsigned int id = msg->id(); - if (id == 0xffff) { - outgoing.push_back(msg); - break; - } - if (id < last_update.size() && last_update[id].second != 0) { - // overwrite the previous one for this id - outgoing[last_update[id].second - 1] = msg; - } else { - // new, but remember it - std::size_t pos = outgoing.size(); - outgoing.push_back(msg); - if (id >= last_update.size()) last_update.resize(id + 1); - last_update[id].second = pos + 1; - } - break; - } - case Message::kClearEntries: { - // knock out all previous assigns/updates! - for (auto& i : outgoing) { - if (!i) continue; - auto t = i->type(); - if (t == Message::kEntryAssign || t == Message::kEntryUpdate || - t == Message::kFlagsUpdate || t == Message::kEntryDelete || - t == Message::kClearEntries) - i.reset(); - } - last_update.resize(0); - outgoing.push_back(msg); - break; - } - default: - outgoing.push_back(msg); - break; } } @@ -299,12 +189,12 @@ void DispatcherBase::QueueOutgoing(std::shared_ptr msg, NetworkConnection* except) { std::lock_guard user_lock(m_user_mutex); for (auto& conn : m_connections) { - if (conn.net.get() == except) continue; - if (only && conn.net.get() != only) continue; - auto state = conn.net->state(); + if (conn.get() == except) continue; + if (only && conn.get() != only) continue; + auto state = conn->state(); if (state != NetworkConnection::kSynchronized && state != NetworkConnection::kActive) continue; - conn.QueueOutgoing(msg); + conn->QueueOutgoing(msg); } } @@ -324,18 +214,18 @@ void DispatcherBase::ServerThreadMain() { // add to connections list using namespace std::placeholders; - std::unique_ptr conn_unique(new NetworkConnection( - std::move(stream), - m_notifier, + auto conn = std::make_shared( + std::move(stream), m_notifier, std::bind(&Dispatcher::ServerHandshake, this, _1, _2, _3), - std::bind(&Storage::GetEntryType, &m_storage, _1), - std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2))); - auto conn = conn_unique.get(); + std::bind(&Storage::GetEntryType, &m_storage, _1)); + conn->set_process_incoming( + std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2, + std::weak_ptr(conn))); { std::lock_guard lock(m_user_mutex); - m_connections.emplace_back(std::move(conn_unique)); + m_connections.emplace_back(conn); + conn->Start(); } - conn->Start(); } } @@ -353,15 +243,15 @@ void DispatcherBase::ClientThreadMain( std::unique_lock lock(m_user_mutex); using namespace std::placeholders; - std::unique_ptr conn_unique(new NetworkConnection( - std::move(stream), - m_notifier, + auto conn = std::make_shared( + std::move(stream), m_notifier, std::bind(&Dispatcher::ClientHandshake, this, _1, _2, _3), - std::bind(&Storage::GetEntryType, &m_storage, _1), - std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2))); - auto conn = conn_unique.get(); + std::bind(&Storage::GetEntryType, &m_storage, _1)); + conn->set_process_incoming( + std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2, + std::weak_ptr(conn))); m_connections.resize(0); // disconnect any current - m_connections.emplace_back(std::move(conn_unique)); + m_connections.emplace_back(conn); conn->set_proto_rev(m_reconnect_proto_rev); conn->Start(); @@ -514,7 +404,8 @@ bool DispatcherBase::ServerHandshake( // get the next message (blocks) msg = get_msg(); } - for (auto& msg : incoming) m_storage.ProcessIncoming(msg, &conn); + for (auto& msg : incoming) + m_storage.ProcessIncoming(msg, &conn, std::weak_ptr()); } INFO("server: client CONNECTED: " << conn.stream().getPeerIP() << " port " diff --git a/src/Dispatcher.h b/src/Dispatcher.h index e00a9c90b0..0aed45cf69 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -81,24 +81,7 @@ class DispatcherBase { // Mutex for user-accessible items mutable std::mutex m_user_mutex; - struct Connection { - Connection() = default; - explicit Connection(std::unique_ptr net_) - : net(std::move(net_)) {} - Connection(Connection&& rhs) { - net = std::move(rhs.net); - outgoing = std::move(rhs.outgoing); - last_update = std::move(rhs.last_update); - } - Connection(const Connection&) = delete; - Connection& operator=(const Connection&) = delete; - void QueueOutgoing(std::shared_ptr msg); - - std::unique_ptr net; - NetworkConnection::Outgoing outgoing; - std::vector> last_update; - }; - std::vector m_connections; + std::vector> m_connections; std::string m_identity; std::atomic_bool m_active; // set to false to terminate threads diff --git a/src/NetworkConnection.cpp b/src/NetworkConnection.cpp index e282cad857..67e9487889 100644 --- a/src/NetworkConnection.cpp +++ b/src/NetworkConnection.cpp @@ -17,16 +17,17 @@ using namespace nt; +std::atomic_uint NetworkConnection::s_uid; + NetworkConnection::NetworkConnection(std::unique_ptr stream, Notifier& notifier, HandshakeFunc handshake, - Message::GetEntryTypeFunc get_entry_type, - ProcessIncomingFunc process_incoming) - : m_stream(std::move(stream)), + Message::GetEntryTypeFunc get_entry_type) + : m_uid(s_uid.fetch_add(1)), + m_stream(std::move(stream)), m_notifier(notifier), m_handshake(handshake), - m_get_entry_type(get_entry_type), - m_process_incoming(process_incoming) { + m_get_entry_type(get_entry_type) { m_active = false; m_proto_rev = 0x0300; m_state = static_cast(kCreated); @@ -153,3 +154,106 @@ void NetworkConnection::WriteThreadMain() { m_active = false; if (m_stream) m_stream->close(); // also kill read thread } + +void NetworkConnection::QueueOutgoing(std::shared_ptr msg) { + std::lock_guard lock(m_pending_mutex); + + // Merge with previous. One case we don't combine: delete/assign loop. + switch (msg->type()) { + case Message::kEntryAssign: + case Message::kEntryUpdate: { + // don't do this for unassigned id's + unsigned int id = msg->id(); + if (id == 0xffff) { + m_pending_outgoing.push_back(msg); + break; + } + if (id < m_pending_update.size() && m_pending_update[id].first != 0) { + // overwrite the previous one for this id + auto& oldmsg = m_pending_outgoing[m_pending_update[id].first - 1]; + if (oldmsg && oldmsg->Is(Message::kEntryAssign) && + msg->Is(Message::kEntryUpdate)) { + // need to update assignment with new seq_num and value + oldmsg = Message::EntryAssign(oldmsg->str(), id, msg->seq_num_uid(), + msg->value(), oldmsg->flags()); + } else + oldmsg = msg; // easy update + } else { + // new, but remember it + std::size_t pos = m_pending_outgoing.size(); + m_pending_outgoing.push_back(msg); + if (id >= m_pending_update.size()) m_pending_update.resize(id + 1); + m_pending_update[id].first = pos + 1; + } + break; + } + case Message::kEntryDelete: { + // don't do this for unassigned id's + unsigned int id = msg->id(); + if (id == 0xffff) { + m_pending_outgoing.push_back(msg); + break; + } + + // clear previous updates + if (id < m_pending_update.size()) { + if (m_pending_update[id].first != 0) { + m_pending_outgoing[m_pending_update[id].first - 1].reset(); + m_pending_update[id].first = 0; + } + if (m_pending_update[id].second != 0) { + m_pending_outgoing[m_pending_update[id].second - 1].reset(); + m_pending_update[id].second = 0; + } + } + + // add deletion + m_pending_outgoing.push_back(msg); + break; + } + case Message::kFlagsUpdate: { + // don't do this for unassigned id's + unsigned int id = msg->id(); + if (id == 0xffff) { + m_pending_outgoing.push_back(msg); + break; + } + if (id < m_pending_update.size() && m_pending_update[id].second != 0) { + // overwrite the previous one for this id + m_pending_outgoing[m_pending_update[id].second - 1] = msg; + } else { + // new, but remember it + std::size_t pos = m_pending_outgoing.size(); + m_pending_outgoing.push_back(msg); + if (id >= m_pending_update.size()) m_pending_update.resize(id + 1); + m_pending_update[id].second = pos + 1; + } + break; + } + case Message::kClearEntries: { + // knock out all previous assigns/updates! + for (auto& i : m_pending_outgoing) { + if (!i) continue; + auto t = i->type(); + if (t == Message::kEntryAssign || t == Message::kEntryUpdate || + t == Message::kFlagsUpdate || t == Message::kEntryDelete || + t == Message::kClearEntries) + i.reset(); + } + m_pending_update.resize(0); + m_pending_outgoing.push_back(msg); + break; + } + default: + m_pending_outgoing.push_back(msg); + break; + } +} + +void NetworkConnection::PostOutgoing() { + std::lock_guard lock(m_pending_mutex); + if (m_pending_outgoing.empty()) return; + m_outgoing.emplace(std::move(m_pending_outgoing)); + m_pending_outgoing.resize(0); + m_pending_update.resize(0); +} diff --git a/src/NetworkConnection.h b/src/NetworkConnection.h index fa8b6ae0b7..e0b6267803 100644 --- a/src/NetworkConnection.h +++ b/src/NetworkConnection.h @@ -39,10 +39,14 @@ class NetworkConnection { NetworkConnection(std::unique_ptr stream, Notifier& notifier, HandshakeFunc handshake, - Message::GetEntryTypeFunc get_entry_type, - ProcessIncomingFunc process_incoming); + Message::GetEntryTypeFunc get_entry_type); ~NetworkConnection(); + // Set the input processor function. This must be called before Start(). + void set_process_incoming(ProcessIncomingFunc func) { + m_process_incoming = func; + } + void Start(); void Stop(); @@ -50,7 +54,11 @@ class NetworkConnection { bool active() const { return m_active; } NetworkStream& stream() { return *m_stream; } - OutgoingQueue& outgoing() { return m_outgoing; } + + void QueueOutgoing(std::shared_ptr msg); + void PostOutgoing(); + + unsigned int uid() const { return m_uid; } unsigned int proto_rev() const { return m_proto_rev; } void set_proto_rev(unsigned int proto_rev) { m_proto_rev = proto_rev; } @@ -70,6 +78,9 @@ class NetworkConnection { void ReadThreadMain(); void WriteThreadMain(); + static std::atomic_uint s_uid; + + unsigned int m_uid; std::unique_ptr m_stream; Notifier& m_notifier; OutgoingQueue m_outgoing; @@ -84,6 +95,10 @@ class NetworkConnection { mutable std::mutex m_remote_id_mutex; std::string m_remote_id; std::atomic_ullong m_last_update; + + std::mutex m_pending_mutex; + Outgoing m_pending_outgoing; + std::vector> m_pending_update; }; } // namespace nt diff --git a/src/RpcServer.cpp b/src/RpcServer.cpp new file mode 100644 index 0000000000..3b5fa510b0 --- /dev/null +++ b/src/RpcServer.cpp @@ -0,0 +1,119 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "RpcServer.h" + +#include "Log.h" + +using namespace nt; + +ATOMIC_STATIC_INIT(RpcServer) + +RpcServer::RpcServer() { + m_active = false; + m_terminating = false; +} + +RpcServer::~RpcServer() { + Stop(); + m_terminating = true; + m_poll_cond.notify_all(); +} + +void RpcServer::Start() { + { + std::lock_guard lock(m_mutex); + if (m_active) return; + m_active = true; + } + m_thread = std::thread(&RpcServer::ThreadMain, this); +} + +void RpcServer::Stop() { + m_active = false; + if (m_thread.joinable()) { + // send notification so the thread terminates + m_call_cond.notify_one(); + m_thread.join(); + } +} + +void RpcServer::ProcessRpc(StringRef name, std::shared_ptr msg, + RpcCallback func, unsigned int conn_id, + SendMsgFunc send_response) { + std::unique_lock lock(m_mutex); + + if (func) + m_call_queue.emplace(name, msg, func, conn_id, send_response); + else + m_poll_queue.emplace(name, msg, func, conn_id, send_response); + + lock.unlock(); + + if (func) + m_call_cond.notify_one(); + else + m_poll_cond.notify_one(); +} + +bool RpcServer::PollRpc(bool blocking, RpcCallInfo* call_info) { + std::unique_lock lock(m_mutex); + while (m_poll_queue.empty()) { + if (!blocking || m_terminating) return false; + m_poll_cond.wait(lock); + } + + auto& item = m_poll_queue.front(); + unsigned int call_uid = (item.conn_id << 16) | item.msg->seq_num_uid(); + call_info->rpc_id = item.msg->id(); + call_info->call_uid = call_uid; + call_info->name = std::move(item.name); + call_info->params = item.msg->str(); + m_response_map.insert(std::make_pair(std::make_pair(item.msg->id(), call_uid), + item.send_response)); + m_poll_queue.pop(); + return true; +} + +void RpcServer::PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, + llvm::StringRef result) { + auto i = m_response_map.find(std::make_pair(rpc_id, call_uid)); + if (i == m_response_map.end()) { + WARNING("posting RPC response to nonexistent call (or duplicate response)"); + return; + } + (i->getSecond())(Message::RpcResponse(rpc_id, call_uid, result)); + m_response_map.erase(i); +} + +void RpcServer::ThreadMain() { + std::unique_lock lock(m_mutex); + std::string tmp; + while (m_active) { + while (m_call_queue.empty()) { + m_call_cond.wait(lock); + if (!m_active) return; + } + + while (!m_call_queue.empty()) { + auto item = std::move(m_call_queue.front()); + m_call_queue.pop(); + + DEBUG4("rpc calling " << item.name); + + if (item.name.empty() || !item.msg || !item.func || !item.send_response) + continue; + + // Don't hold mutex during callback execution! + lock.unlock(); + auto result = item.func(item.name, item.msg->str()); + item.send_response(Message::RpcResponse(item.msg->id(), + item.msg->seq_num_uid(), result)); + lock.lock(); + } + } +} diff --git a/src/RpcServer.h b/src/RpcServer.h new file mode 100644 index 0000000000..73cc799693 --- /dev/null +++ b/src/RpcServer.h @@ -0,0 +1,88 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#ifndef NT_RPCSERVER_H_ +#define NT_RPCSERVER_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include "llvm/DenseMap.h" +#include "atomic_static.h" +#include "Message.h" +#include "ntcore_cpp.h" + +namespace nt { + +class RpcServer { + friend class RpcServerTest; + public: + static RpcServer& GetInstance() { + ATOMIC_STATIC(RpcServer, instance); + return instance; + } + ~RpcServer(); + + typedef std::function)> SendMsgFunc; + + void Start(); + void Stop(); + + bool active() const { return m_active; } + + void ProcessRpc(StringRef name, std::shared_ptr msg, + RpcCallback func, unsigned int conn_id, + SendMsgFunc send_response); + + bool PollRpc(bool blocking, RpcCallInfo* call_info); + void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, + llvm::StringRef result); + + private: + RpcServer(); + + void ThreadMain(); + + std::atomic_bool m_active; + std::atomic_bool m_terminating; + + std::mutex m_mutex; + std::condition_variable m_call_cond, m_poll_cond; + + struct RpcCall { + RpcCall(StringRef name_, std::shared_ptr msg_, RpcCallback func_, + unsigned int conn_id_, SendMsgFunc send_response_) + : name(name_), + msg(msg_), + func(func_), + conn_id(conn_id_), + send_response(send_response_) {} + + std::string name; + std::shared_ptr msg; + RpcCallback func; + unsigned int conn_id; + SendMsgFunc send_response; + }; + std::queue m_call_queue, m_poll_queue; + + llvm::DenseMap, SendMsgFunc> + m_response_map; + + std::thread m_thread; + + ATOMIC_STATIC_DECL(RpcServer) +}; + +} // namespace nt + +#endif // NT_RPCSERVER_H_ diff --git a/src/Storage.cpp b/src/Storage.cpp index f9966af254..fd0d4ae7ee 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -20,9 +20,15 @@ using namespace nt; ATOMIC_STATIC_INIT(Storage) -Storage::Storage(Notifier& notifier) : m_notifier(notifier) {} +Storage::Storage(Notifier& notifier, RpcServer& rpc_server) + : m_notifier(notifier), m_rpc_server(rpc_server) { + m_terminating = false; +} -Storage::~Storage() {} +Storage::~Storage() { + m_terminating = true; + m_rpc_results_cond.notify_all(); +} void Storage::SetOutgoing(QueueOutgoingFunc queue_outgoing, bool server) { std::lock_guard lock(m_mutex); @@ -43,7 +49,8 @@ NT_Type Storage::GetEntryType(unsigned int id) const { } void Storage::ProcessIncoming(std::shared_ptr msg, - NetworkConnection* conn) { + NetworkConnection* conn, + std::weak_ptr conn_weak) { std::unique_lock lock(m_mutex); switch (msg->type()) { case Message::kKeepAlive: @@ -267,10 +274,36 @@ void Storage::ProcessIncoming(std::shared_ptr msg, } break; } - case Message::kExecuteRpc: - case Message::kRpcResponse: - // TODO + case Message::kExecuteRpc: { + if (!m_server) return; // only process on server + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore call to non-existent RPC + // this can happen due to deleted entries + lock.unlock(); + DEBUG("received RPC call to unknown entry"); + return; + } + Entry* entry = m_idmap[id]; + if (!entry->value->IsRpc()) { + lock.unlock(); + DEBUG("received RPC call to non-RPC entry"); + return; + } + m_rpc_server.ProcessRpc(entry->name, msg, entry->rpc_callback, + conn->uid(), [=](std::shared_ptr msg) { + auto c = conn_weak.lock(); + if (c) c->QueueOutgoing(msg); + }); break; + } + case Message::kRpcResponse: { + if (m_server) return; // only process on client + m_rpc_results.insert(std::make_pair( + std::make_pair(msg->id(), msg->seq_num_uid()), msg->str())); + m_rpc_results_cond.notify_all(); + break; + } default: break; } @@ -478,10 +511,11 @@ void Storage::DeleteEntry(StringRef name) { if (i == m_entries.end()) return; Entry* entry = i->getValue().get(); unsigned int id = entry->id; + bool had_value = entry->value != nullptr; m_entries.erase(i); // erase from map if (id < m_idmap.size()) m_idmap[id] = nullptr; - if (!entry->value) return; + if (!had_value) return; // if it had a value, generate message // don't send an update if we don't have an assigned id yet @@ -1003,3 +1037,136 @@ next_line: return true; } + +void Storage::CreateRpc(StringRef name, StringRef def, RpcCallback callback) { + if (name.empty() || def.empty() || !callback) return; + std::unique_lock lock(m_mutex); + if (!m_server) return; // only server can create RPCs + + auto& new_entry = m_entries[name]; + if (!new_entry) new_entry.reset(new Entry(name)); + Entry* entry = new_entry.get(); + auto old_value = entry->value; + auto value = Value::MakeRpc(def); + entry->value = value; + + // set up the new callback + entry->rpc_callback = callback; + + // start the RPC server + if (!m_rpc_server.active()) m_rpc_server.Start(); + + if (old_value && *old_value == *value) return; + + // assign an id if it doesn't have one + if (entry->id == 0xffff) { + unsigned int id = m_idmap.size(); + entry->id = id; + m_idmap.push_back(entry); + } + + // generate message + if (!m_queue_outgoing) return; + auto queue_outgoing = m_queue_outgoing; + if (!old_value || old_value->type() != value->type()) { + ++entry->seq_num; + auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(), + value, entry->flags); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } else { + ++entry->seq_num; + auto msg = Message::EntryUpdate(entry->id, entry->seq_num.value(), value); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } +} + +void Storage::CreatePolledRpc(StringRef name, StringRef def) { + if (name.empty() || def.empty()) return; + std::unique_lock lock(m_mutex); + if (!m_server) return; // only server can create RPCs + + auto& new_entry = m_entries[name]; + if (!new_entry) new_entry.reset(new Entry(name)); + Entry* entry = new_entry.get(); + auto old_value = entry->value; + auto value = Value::MakeRpc(def); + entry->value = value; + + // a nullptr callback indicates a polled RPC + entry->rpc_callback = nullptr; + + if (old_value && *old_value == *value) return; + + // assign an id if it doesn't have one + if (entry->id == 0xffff) { + unsigned int id = m_idmap.size(); + entry->id = id; + m_idmap.push_back(entry); + } + + // generate message + if (!m_queue_outgoing) return; + auto queue_outgoing = m_queue_outgoing; + if (!old_value || old_value->type() != value->type()) { + ++entry->seq_num; + auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(), + value, entry->flags); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } else { + ++entry->seq_num; + auto msg = Message::EntryUpdate(entry->id, entry->seq_num.value(), value); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } +} + +unsigned int Storage::CallRpc(StringRef name, StringRef params) { + std::unique_lock lock(m_mutex); + auto i = m_entries.find(name); + if (i == m_entries.end()) return 0; + auto& entry = i->getValue(); + if (!entry->value->IsRpc()) return 0; + + ++entry->rpc_call_uid; + if (entry->rpc_call_uid > 0xffff) entry->rpc_call_uid = 0; + unsigned int combined_uid = (entry->id << 16) | entry->rpc_call_uid; + auto msg = Message::ExecuteRpc(entry->id, entry->rpc_call_uid, params); + if (m_server) { + // RPCs are unlikely to be used locally on the server, but handle it + // gracefully anyway. + auto rpc_callback = entry->rpc_callback; + lock.unlock(); + m_rpc_server.ProcessRpc( + name, msg, rpc_callback, 0xffffU, [this](std::shared_ptr msg) { + std::lock_guard lock(m_mutex); + m_rpc_results.insert(std::make_pair( + std::make_pair(msg->id(), msg->seq_num_uid()), msg->str())); + m_rpc_results_cond.notify_all(); + }); + } else { + auto queue_outgoing = m_queue_outgoing; + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } + return combined_uid; +} + +bool Storage::GetRpcResult(bool blocking, unsigned int call_uid, + std::string* result) { + std::unique_lock lock(m_mutex); + for (;;) { + auto i = + m_rpc_results.find(std::make_pair(call_uid >> 16, call_uid & 0xffff)); + if (i == m_rpc_results.end()) { + if (!blocking || m_terminating) return false; + m_rpc_results_cond.wait(lock); + continue; + } + result->swap(i->getSecond()); + m_rpc_results.erase(i); + return true; + } +} diff --git a/src/Storage.h b/src/Storage.h index 81a741ece3..fdf84598f4 100644 --- a/src/Storage.h +++ b/src/Storage.h @@ -16,11 +16,13 @@ #include #include +#include "llvm/DenseMap.h" #include "llvm/StringMap.h" #include "atomic_static.h" #include "Message.h" #include "Notifier.h" #include "ntcore_cpp.h" +#include "RpcServer.h" #include "SequenceNumber.h" namespace nt { @@ -46,7 +48,8 @@ class Storage { NT_Type GetEntryType(unsigned int id) const; - void ProcessIncoming(std::shared_ptr msg, NetworkConnection* conn); + void ProcessIncoming(std::shared_ptr msg, NetworkConnection* conn, + std::weak_ptr conn_weak); void GetInitialAssignments(NetworkConnection& conn, std::vector>* msgs); void ApplyInitialAssignments(NetworkConnection& conn, @@ -72,14 +75,23 @@ class Storage { std::istream& is, std::function warn); + // RPC configuration needs to come through here as RPC definitions are + // actually special Storage value types. + void CreateRpc(StringRef name, StringRef def, RpcCallback callback); + void CreatePolledRpc(StringRef name, StringRef def); + + unsigned int CallRpc(StringRef name, StringRef params); + bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result); + private: - Storage() : Storage(Notifier::GetInstance()) {} - Storage(Notifier& notifier); + Storage() : Storage(Notifier::GetInstance(), RpcServer::GetInstance()) {} + Storage(Notifier& notifier, RpcServer& rpcserver); Storage(const Storage&) = delete; Storage& operator=(const Storage&) = delete; struct Entry { - Entry(llvm::StringRef name_) : name(name_), flags(0), id(0xffff) {} + Entry(llvm::StringRef name_) + : name(name_), flags(0), id(0xffff), rpc_call_uid(0) {} bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; } std::string name; @@ -87,18 +99,26 @@ class Storage { unsigned int flags; unsigned int id; SequenceNumber seq_num; + RpcCallback rpc_callback; + unsigned int rpc_call_uid; }; typedef llvm::StringMap> EntriesMap; typedef std::vector IdMap; + typedef llvm::DenseMap, std::string> + RpcResultMap; mutable std::mutex m_mutex; EntriesMap m_entries; IdMap m_idmap; + RpcResultMap m_rpc_results; + std::atomic_bool m_terminating; + std::condition_variable m_rpc_results_cond; QueueOutgoingFunc m_queue_outgoing; - bool m_server; + bool m_server = true; Notifier& m_notifier; + RpcServer& m_rpc_server; ATOMIC_STATIC_DECL(Storage) }; diff --git a/src/ntcore_c.cpp b/src/ntcore_c.cpp index 425818a124..b53b2b48fc 100644 --- a/src/ntcore_c.cpp +++ b/src/ntcore_c.cpp @@ -37,6 +37,41 @@ static void ConvertToC(const ConnectionInfo& in, NT_ConnectionInfo* out) { out->protocol_version = in.protocol_version; } +static void ConvertToC(const RpcParamDef& in, NT_RpcParamDef* out) { + ConvertToC(in.name, &out->name); + NT_InitValue(&out->def_value); + ConvertToC(*in.def_value, &out->def_value); +} + +static void ConvertToC(const RpcResultDef& in, NT_RpcResultDef* out) { + ConvertToC(in.name, &out->name); + out->type = in.type; +} + +static void ConvertToC(const RpcDefinition& in, NT_RpcDefinition* out) { + out->version = in.version; + ConvertToC(in.name, &out->name); + + out->num_params = in.params.size(); + out->params = static_cast( + std::malloc(in.params.size() * sizeof(NT_RpcParamDef))); + for (size_t i = 0; i < in.params.size(); ++i) + ConvertToC(in.params[i], &out->params[i]); + + out->num_results = in.results.size(); + out->results = static_cast( + std::malloc(in.results.size() * sizeof(NT_RpcResultDef))); + for (size_t i = 0; i < in.results.size(); ++i) + ConvertToC(in.results[i], &out->results[i]); +} + +static void ConvertToC(const RpcCallInfo& in, NT_RpcCallInfo* out) { + out->rpc_id = in.rpc_id; + out->call_uid = in.call_uid; + ConvertToC(in.name, &out->name); + ConvertToC(in.params, &out->params); +} + static void DisposeConnectionInfo(NT_ConnectionInfo *info) { std::free(info->remote_id.str); std::free(info->remote_name); @@ -164,72 +199,108 @@ void NT_RemoveConnectionListener(unsigned int conn_listener_uid) { * Remote Procedure Call Functions */ -unsigned int NT_CreateRpc(const char *name, size_t name_len, - const NT_RpcDefinition *def, void *data, - NT_RpcCallback callback) { - return nt::CreateRpc( - StringRef(name, name_len), - ConvertFromC(*def), - [=](unsigned int uid, StringRef name, - ArrayRef> params) - -> std::vector> { - // convert params to NT_Value* array - std::vector params_c(params.size()); - for (size_t i = 0; i < params.size(); ++i) - params_c[i] = ¶ms[i]->value(); - +void NT_CreateRpc(const char *name, size_t name_len, const char *def, + size_t def_len, void *data, NT_RpcCallback callback) { + nt::CreateRpc( + StringRef(name, name_len), StringRef(def, def_len), + [=](StringRef name, StringRef params) -> std::string { size_t results_len; - NT_Value** results_c = callback(uid, data, name.data(), name.size(), - params_c.data(), params.size(), - &results_len); - - // convert results to Value array - std::vector> results; - results.reserve(results_len); - for (size_t i = 0; i < results_len; ++i) - results.push_back(ConvertFromC(*results_c[i])); - - // dispose the C version - for (size_t i = 0; i < results_len; ++i) { - NT_DisposeValue(results_c[i]); - std::free(results_c[i]); - } + char* results_c = callback(data, name.data(), name.size(), + params.data(), params.size(), &results_len); + std::string results(results_c, results_len); std::free(results_c); - return results; }); } -void NT_DeleteRpc(unsigned int rpc_uid) { - nt::DeleteRpc(rpc_uid); +void NT_CreatePolledRpc(const char *name, size_t name_len, const char *def, + size_t def_len) { + nt::CreatePolledRpc(StringRef(name, name_len), StringRef(def, def_len)); +} + +int NT_PollRpc(int blocking, NT_RpcCallInfo* call_info) { + RpcCallInfo call_info_cpp; + if (!nt::PollRpc(blocking, &call_info_cpp)) + return 0; + ConvertToC(call_info_cpp, call_info); + return 1; +} + +void NT_PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, + const char *result, size_t result_len) { + nt::PostRpcResponse(rpc_id, call_uid, StringRef(result, result_len)); } unsigned int NT_CallRpc(const char *name, size_t name_len, - const NT_Value **params, size_t params_len) { - // create input vector - std::vector> params_v; - params_v.reserve(params_len); - for (size_t i = 0; i < params_len; ++i) - params_v.push_back(ConvertFromC(*params[i])); - - // make the call - return nt::CallRpc(StringRef(name, name_len), params_v); + const char *params, size_t params_len) { + return nt::CallRpc(StringRef(name, name_len), StringRef(params, params_len)); } -NT_Value **NT_GetRpcResult(unsigned int result_uid, size_t *results_len) { - auto results_v = nt::GetRpcResult(result_uid); - *results_len = results_v.size(); - if (results_v.size() == 0) return nullptr; +char *NT_GetRpcResult(int blocking, unsigned int call_uid, size_t *result_len) { + std::string result; + if (!nt::GetRpcResult(blocking, call_uid, &result)) return nullptr; + + // convert result + *result_len = result.size(); + char *result_cstr; + ConvertToC(result, &result_cstr); + return result_cstr; +} + +char *NT_PackRpcDefinition(const NT_RpcDefinition *def, size_t *packed_len) { + auto packed = nt::PackRpcDefinition(ConvertFromC(*def)); + + // convert result + *packed_len = packed.size(); + char *packed_cstr; + ConvertToC(packed, &packed_cstr); + return packed_cstr; +} + +int NT_UnpackRpcDefinition(const char *packed, size_t packed_len, + NT_RpcDefinition *def) { + nt::RpcDefinition def_v; + if (!nt::UnpackRpcDefinition(StringRef(packed, packed_len), &def_v)) + return 0; + + // convert result + ConvertToC(def_v, def); + return 1; +} + +char *NT_PackRpcValues(const NT_Value **values, size_t values_len, + size_t *packed_len) { + // create input vector + std::vector> values_v; + values_v.reserve(values_len); + for (size_t i = 0; i < values_len; ++i) + values_v.push_back(ConvertFromC(*values[i])); + + // make the call + auto packed = nt::PackRpcValues(values_v); + + // convert result + *packed_len = packed.size(); + char *packed_cstr; + ConvertToC(packed, &packed_cstr); + return packed_cstr; +} + +NT_Value **NT_UnpackRpcValues(const char *packed, size_t packed_len, + const NT_Type *types, size_t types_len) { + auto values_v = nt::UnpackRpcValues(StringRef(packed, packed_len), + ArrayRef(types, types_len)); + if (values_v.size() == 0) return nullptr; // create array and copy into it - NT_Value** results = static_cast( - std::malloc(results_v.size() * sizeof(NT_Value*))); - for (size_t i = 0; i < results_v.size(); ++i) { - results[i] = static_cast(std::malloc(sizeof(NT_Value))); - NT_InitValue(results[i]); - ConvertToC(*results_v[i], results[i]); + NT_Value** values = static_cast( + std::malloc(values_v.size() * sizeof(NT_Value*))); + for (size_t i = 0; i < values_v.size(); ++i) { + values[i] = static_cast(std::malloc(sizeof(NT_Value))); + NT_InitValue(values[i]); + ConvertToC(*values_v[i], values[i]); } - return results; + return values; } /* @@ -342,3 +413,26 @@ void NT_DisposeConnectionInfoArray(NT_ConnectionInfo *arr, size_t count) { for (size_t i = 0; i < count; i++) DisposeConnectionInfo(&arr[i]); std::free(arr); } + +void NT_DisposeRpcDefinition(NT_RpcDefinition *def) { + NT_DisposeString(&def->name); + + for (size_t i = 0; i < def->num_params; ++i) { + NT_DisposeString(&def->params[i].name); + NT_DisposeValue(&def->params[i].def_value); + } + std::free(def->params); + def->params = nullptr; + def->num_params = 0; + + for (size_t i = 0; i < def->num_results; ++i) + NT_DisposeString(&def->results[i].name); + std::free(def->results); + def->results = nullptr; + def->num_results = 0; +} + +void NT_DisposeRpcCallInfo(NT_RpcCallInfo *call_info) { + NT_DisposeString(&call_info->name); + NT_DisposeString(&call_info->params); +} diff --git a/src/ntcore_cpp.cpp b/src/ntcore_cpp.cpp index 731d74f807..a99cf992cc 100644 --- a/src/ntcore_cpp.cpp +++ b/src/ntcore_cpp.cpp @@ -15,7 +15,10 @@ #include "Dispatcher.h" #include "Log.h" #include "Notifier.h" +#include "RpcServer.h" #include "Storage.h" +#include "WireDecoder.h" +#include "WireEncoder.h" namespace nt { @@ -91,20 +94,111 @@ void RemoveConnectionListener(unsigned int conn_listener_uid) { * Remote Procedure Call Functions */ -unsigned int CreateRpc(StringRef name, const RpcDefinition& def, - RpcCallback callback) { - return 0; +void CreateRpc(StringRef name, StringRef def, RpcCallback callback) { + Storage::GetInstance().CreateRpc(name, def, callback); } -void DeleteRpc(unsigned int rpc_uid) {} - -unsigned int CallRpc(StringRef name, - ArrayRef> params) { - return 0; +void CreatePolledRpc(StringRef name, StringRef def) { + Storage::GetInstance().CreatePolledRpc(name, def); } -std::vector> GetRpcResult(unsigned int result_uid) { - return std::vector>(); +bool PollRpc(bool blocking, RpcCallInfo* call_info) { + return RpcServer::GetInstance().PollRpc(blocking, call_info); +} + +void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid, + StringRef result) { + RpcServer::GetInstance().PostRpcResponse(rpc_id, call_uid, result); +} + +unsigned int CallRpc(StringRef name, StringRef params) { + return Storage::GetInstance().CallRpc(name, params); +} + +bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result) { + return Storage::GetInstance().GetRpcResult(blocking, call_uid, result); +} + +std::string PackRpcDefinition(const RpcDefinition& def) { + WireEncoder enc(0x0300); + enc.Write8(def.version); + enc.WriteString(def.name); + + // parameters + unsigned int params_size = def.params.size(); + if (params_size > 0xff) params_size = 0xff; + enc.Write8(params_size); + for (std::size_t i = 0; i < params_size; ++i) { + enc.WriteType(def.params[i].def_value->type()); + enc.WriteString(def.params[i].name); + enc.WriteValue(*def.params[i].def_value); + } + + // results + unsigned int results_size = def.results.size(); + if (results_size > 0xff) results_size = 0xff; + enc.Write8(results_size); + for (std::size_t i = 0; i < results_size; ++i) { + enc.WriteType(def.results[i].type); + enc.WriteString(def.results[i].name); + } + + return enc.ToStringRef(); +} + +bool UnpackRpcDefinition(StringRef packed, RpcDefinition* def) { + raw_mem_istream is(packed.data(), packed.size()); + WireDecoder dec(is, 0x0300); + if (!dec.Read8(&def->version)) return false; + if (!dec.ReadString(&def->name)) return false; + + // parameters + unsigned int params_size; + if (!dec.Read8(¶ms_size)) return false; + def->params.resize(0); + def->params.reserve(params_size); + for (std::size_t i = 0; i < params_size; ++i) { + RpcParamDef pdef; + NT_Type type; + if (!dec.ReadType(&type)) return false; + if (!dec.ReadString(&pdef.name)) return false; + pdef.def_value = dec.ReadValue(type); + if (!pdef.def_value) return false; + def->params.emplace_back(std::move(pdef)); + } + + // results + unsigned int results_size; + if (!dec.Read8(&results_size)) return false; + def->results.resize(0); + def->results.reserve(results_size); + for (std::size_t i = 0; i < results_size; ++i) { + RpcResultDef rdef; + if (!dec.ReadType(&rdef.type)) return false; + if (!dec.ReadString(&rdef.name)) return false; + def->results.emplace_back(std::move(rdef)); + } + + return true; +} + +std::string PackRpcValues(ArrayRef> values) { + WireEncoder enc(0x0300); + for (auto& value : values) enc.WriteValue(*value); + return enc.ToStringRef(); +} + +std::vector> UnpackRpcValues(StringRef packed, + ArrayRef types) { + raw_mem_istream is(packed.data(), packed.size()); + WireDecoder dec(is, 0x0300); + std::vector> vec; + for (auto type : types) { + auto item = dec.ReadValue(type); + if (!item) return std::vector>(); + vec.emplace_back(std::move(item)); + } + return vec; } /*