From 8099d6dbd7f7ddf5a5a61e3a58998f2501e6c693 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 13 Aug 2017 08:20:28 -0700 Subject: [PATCH] Refactor Storage ProcessIncoming(). --- src/main/native/cpp/Storage.cpp | 567 +++++++++++++++++--------------- src/main/native/cpp/Storage.h | 16 + 2 files changed, 317 insertions(+), 266 deletions(-) diff --git a/src/main/native/cpp/Storage.cpp b/src/main/native/cpp/Storage.cpp index 8a8f27df77..454ec33bad 100644 --- a/src/main/native/cpp/Storage.cpp +++ b/src/main/native/cpp/Storage.cpp @@ -48,7 +48,6 @@ NT_Type Storage::GetMessageEntryType(unsigned int id) const { void Storage::ProcessIncoming(std::shared_ptr msg, NetworkConnection* conn, std::weak_ptr conn_weak) { - std::unique_lock lock(m_mutex); switch (msg->type()) { case Message::kKeepAlive: break; // ignore @@ -59,283 +58,319 @@ void Storage::ProcessIncoming(std::shared_ptr msg, case Message::kClientHelloDone: // shouldn't get these, but ignore if we do break; - case Message::kEntryAssign: { - unsigned int id = msg->id(); - StringRef name = msg->str(); - Entry* entry; - bool may_need_update = false; - if (m_server) { - // if we're a server, id=0xffff requests are requests for an id - // to be assigned, and we need to send the new assignment back to - // the sender as well as all other connections. - if (id == 0xffff) { - entry = GetOrNew(name); - // see if it was already assigned; ignore if so. - if (entry->id != 0xffff) return; - - entry->flags = msg->flags(); - SetEntryValueImpl(entry, msg->value(), lock, false); - return; - } - if (id >= m_idmap.size() || !m_idmap[id]) { - // ignore arbitrary entry assignments - // this can happen due to e.g. assignment to deleted entry - lock.unlock(); - DEBUG("server: received assignment to unknown entry"); - return; - } - entry = m_idmap[id]; - } else { - // clients simply accept new assignments - if (id == 0xffff) { - lock.unlock(); - DEBUG("client: received entry assignment request?"); - return; - } - if (id >= m_idmap.size()) m_idmap.resize(id + 1); - entry = m_idmap[id]; - if (!entry) { - // create local - bool is_new; - entry = GetOrNew(name, &is_new); - entry->id = id; - m_idmap[id] = entry; - if (is_new) { - // didn't exist at all (rather than just being a response to a - // id assignment request) - entry->value = msg->value(); - entry->flags = msg->flags(); - - // notify - m_notifier.NotifyEntry(entry->local_id, name, entry->value, - NT_NOTIFY_NEW); - return; - } - may_need_update = true; // we may need to send an update message - - // if the received flags don't match what we sent, we most likely - // updated flags locally in the interim; send flags update message. - if (msg->flags() != entry->flags) { - auto dispatcher = m_dispatcher; - auto outmsg = Message::FlagsUpdate(id, entry->flags); - lock.unlock(); - dispatcher->QueueOutgoing(outmsg, nullptr, nullptr); - lock.lock(); - } - } - } - - // common client and server handling - - // already exists; ignore if sequence number not higher than local - SequenceNumber seq_num(msg->seq_num_uid()); - if (seq_num < entry->seq_num) { - if (may_need_update) { - auto dispatcher = m_dispatcher; - auto outmsg = Message::EntryUpdate(entry->id, entry->seq_num.value(), - entry->value); - lock.unlock(); - dispatcher->QueueOutgoing(outmsg, nullptr, nullptr); - } - return; - } - - // sanity check: name should match id - if (msg->str() != entry->name) { - lock.unlock(); - DEBUG("entry assignment for same id with different name?"); - return; - } - - unsigned int notify_flags = NT_NOTIFY_UPDATE; - - // don't update flags from a <3.0 remote (not part of message) - // don't update flags if this is a server response to a client id request - if (!may_need_update && conn->proto_rev() >= 0x0300) { - // update persistent dirty flag if persistent flag changed - if ((entry->flags & NT_PERSISTENT) != (msg->flags() & NT_PERSISTENT)) - m_persistent_dirty = true; - if (entry->flags != msg->flags()) notify_flags |= NT_NOTIFY_FLAGS; - entry->flags = msg->flags(); - } - - // update persistent dirty flag if the value changed and it's persistent - if (entry->IsPersistent() && *entry->value != *msg->value()) - m_persistent_dirty = true; - - // update local - entry->value = msg->value(); - entry->seq_num = seq_num; - - // notify - m_notifier.NotifyEntry(entry->local_id, name, entry->value, notify_flags); - - // broadcast to all other connections (note for client there won't - // be any other connections, so don't bother) - if (m_server && m_dispatcher) { - auto dispatcher = m_dispatcher; - auto outmsg = Message::EntryAssign(entry->name, id, msg->seq_num_uid(), - msg->value(), entry->flags); - lock.unlock(); - dispatcher->QueueOutgoing(outmsg, nullptr, conn); - } + case Message::kEntryAssign: + ProcessIncomingEntryAssign(std::move(msg), conn); break; - } - case Message::kEntryUpdate: { - unsigned int id = msg->id(); - if (id >= m_idmap.size() || !m_idmap[id]) { - // ignore arbitrary entry updates; - // this can happen due to deleted entries - lock.unlock(); - DEBUG("received update to unknown entry"); - return; - } - Entry* entry = m_idmap[id]; - - // ignore if sequence number not higher than local - SequenceNumber seq_num(msg->seq_num_uid()); - if (seq_num <= entry->seq_num) return; - - // update local - entry->value = msg->value(); - entry->seq_num = seq_num; - - // update persistent dirty flag if it's a persistent value - if (entry->IsPersistent()) m_persistent_dirty = true; - - // notify - m_notifier.NotifyEntry(entry->local_id, entry->name, entry->value, - NT_NOTIFY_UPDATE); - - // broadcast to all other connections (note for client there won't - // be any other connections, so don't bother) - if (m_server && m_dispatcher) { - auto dispatcher = m_dispatcher; - lock.unlock(); - dispatcher->QueueOutgoing(msg, nullptr, conn); - } + case Message::kEntryUpdate: + ProcessIncomingEntryUpdate(std::move(msg), conn); break; - } - case Message::kFlagsUpdate: { - unsigned int id = msg->id(); - if (id >= m_idmap.size() || !m_idmap[id]) { - // ignore arbitrary entry updates; - // this can happen due to deleted entries - lock.unlock(); - DEBUG("received flags update to unknown entry"); - return; - } - - // update local - SetEntryFlagsImpl(m_idmap[id], msg->flags(), lock, false); - - // broadcast to all other connections (note for client there won't - // be any other connections, so don't bother) - if (m_server && m_dispatcher) { - auto dispatcher = m_dispatcher; - lock.unlock(); - dispatcher->QueueOutgoing(msg, nullptr, conn); - } + case Message::kFlagsUpdate: + ProcessIncomingFlagsUpdate(std::move(msg), conn); break; - } - case Message::kEntryDelete: { - unsigned int id = msg->id(); - if (id >= m_idmap.size() || !m_idmap[id]) { - // ignore arbitrary entry updates; - // this can happen due to deleted entries - lock.unlock(); - DEBUG("received delete to unknown entry"); - return; - } - - // update local - DeleteEntryImpl(m_idmap[id], m_entries.end(), lock, false); - - // broadcast to all other connections (note for client there won't - // be any other connections, so don't bother) - if (m_server && m_dispatcher) { - auto dispatcher = m_dispatcher; - lock.unlock(); - dispatcher->QueueOutgoing(msg, nullptr, conn); - } + case Message::kEntryDelete: + ProcessIncomingEntryDelete(std::move(msg), conn); break; - } - case Message::kClearEntries: { - // update local - DeleteAllEntriesImpl(false); - - // broadcast to all other connections (note for client there won't - // be any other connections, so don't bother) - if (m_server && m_dispatcher) { - auto dispatcher = m_dispatcher; - lock.unlock(); - dispatcher->QueueOutgoing(msg, nullptr, conn); - } + case Message::kClearEntries: + ProcessIncomingClearEntries(std::move(msg), conn); break; - } - case Message::kExecuteRpc: { - if (!m_server) return; // only process on server - unsigned int id = msg->id(); - if (id >= m_idmap.size() || !m_idmap[id]) { - // ignore call to non-existent RPC - // this can happen due to deleted entries - lock.unlock(); - DEBUG("received RPC call to unknown entry"); - return; - } - Entry* entry = m_idmap[id]; - if (!entry->value || !entry->value->IsRpc()) { - lock.unlock(); - DEBUG("received RPC call to non-RPC entry"); - return; - } - ConnectionInfo conn_info; - auto c = conn_weak.lock(); - if (c) { - conn_info = c->info(); - } else { - conn_info.remote_id = ""; - conn_info.remote_ip = ""; - conn_info.remote_port = 0; - conn_info.last_update = 0; - conn_info.protocol_version = 0; - } - unsigned int call_uid = msg->seq_num_uid(); - m_rpc_server.ProcessRpc( - entry->local_id, call_uid, entry->name, msg->str(), conn_info, - [=](StringRef result) { - auto c = conn_weak.lock(); - if (c) c->QueueOutgoing(Message::RpcResponse(id, call_uid, result)); - }, - entry->rpc_uid); + case Message::kExecuteRpc: + ProcessIncomingExecuteRpc(std::move(msg), conn, std::move(conn_weak)); break; - } - case Message::kRpcResponse: { - if (m_server) return; // only process on client - unsigned int id = msg->id(); - if (id >= m_idmap.size() || !m_idmap[id]) { - // ignore response to non-existent RPC - // this can happen due to deleted entries - lock.unlock(); - DEBUG("received rpc response to unknown entry"); - return; - } - Entry* entry = m_idmap[id]; - if (!entry->value || !entry->value->IsRpc()) { - lock.unlock(); - DEBUG("received RPC response to non-RPC entry"); - return; - } - m_rpc_results.insert(std::make_pair( - RpcIdPair{entry->local_id, msg->seq_num_uid()}, msg->str())); - m_rpc_results_cond.notify_all(); + case Message::kRpcResponse: + ProcessIncomingRpcResponse(std::move(msg), conn); break; - } default: break; } } +void Storage::ProcessIncomingEntryAssign(std::shared_ptr msg, + NetworkConnection* conn) { + std::unique_lock lock(m_mutex); + unsigned int id = msg->id(); + StringRef name = msg->str(); + Entry* entry; + bool may_need_update = false; + if (m_server) { + // if we're a server, id=0xffff requests are requests for an id + // to be assigned, and we need to send the new assignment back to + // the sender as well as all other connections. + if (id == 0xffff) { + entry = GetOrNew(name); + // see if it was already assigned; ignore if so. + if (entry->id != 0xffff) return; + + entry->flags = msg->flags(); + SetEntryValueImpl(entry, msg->value(), lock, false); + return; + } + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore arbitrary entry assignments + // this can happen due to e.g. assignment to deleted entry + lock.unlock(); + DEBUG("server: received assignment to unknown entry"); + return; + } + entry = m_idmap[id]; + } else { + // clients simply accept new assignments + if (id == 0xffff) { + lock.unlock(); + DEBUG("client: received entry assignment request?"); + return; + } + if (id >= m_idmap.size()) m_idmap.resize(id + 1); + entry = m_idmap[id]; + if (!entry) { + // create local + bool is_new; + entry = GetOrNew(name, &is_new); + entry->id = id; + m_idmap[id] = entry; + if (is_new) { + // didn't exist at all (rather than just being a response to a + // id assignment request) + entry->value = msg->value(); + entry->flags = msg->flags(); + + // notify + m_notifier.NotifyEntry(entry->local_id, name, entry->value, + NT_NOTIFY_NEW); + return; + } + may_need_update = true; // we may need to send an update message + + // if the received flags don't match what we sent, we most likely + // updated flags locally in the interim; send flags update message. + if (msg->flags() != entry->flags) { + auto dispatcher = m_dispatcher; + auto outmsg = Message::FlagsUpdate(id, entry->flags); + lock.unlock(); + dispatcher->QueueOutgoing(outmsg, nullptr, nullptr); + lock.lock(); + } + } + } + + // common client and server handling + + // already exists; ignore if sequence number not higher than local + SequenceNumber seq_num(msg->seq_num_uid()); + if (seq_num < entry->seq_num) { + if (may_need_update) { + auto dispatcher = m_dispatcher; + auto outmsg = + Message::EntryUpdate(entry->id, entry->seq_num.value(), entry->value); + lock.unlock(); + dispatcher->QueueOutgoing(outmsg, nullptr, nullptr); + } + return; + } + + // sanity check: name should match id + if (msg->str() != entry->name) { + lock.unlock(); + DEBUG("entry assignment for same id with different name?"); + return; + } + + unsigned int notify_flags = NT_NOTIFY_UPDATE; + + // don't update flags from a <3.0 remote (not part of message) + // don't update flags if this is a server response to a client id request + if (!may_need_update && conn->proto_rev() >= 0x0300) { + // update persistent dirty flag if persistent flag changed + if ((entry->flags & NT_PERSISTENT) != (msg->flags() & NT_PERSISTENT)) + m_persistent_dirty = true; + if (entry->flags != msg->flags()) notify_flags |= NT_NOTIFY_FLAGS; + entry->flags = msg->flags(); + } + + // update persistent dirty flag if the value changed and it's persistent + if (entry->IsPersistent() && *entry->value != *msg->value()) + m_persistent_dirty = true; + + // update local + entry->value = msg->value(); + entry->seq_num = seq_num; + + // notify + m_notifier.NotifyEntry(entry->local_id, name, entry->value, notify_flags); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server && m_dispatcher) { + auto dispatcher = m_dispatcher; + auto outmsg = Message::EntryAssign(entry->name, id, msg->seq_num_uid(), + msg->value(), entry->flags); + lock.unlock(); + dispatcher->QueueOutgoing(outmsg, nullptr, conn); + } +} + +void Storage::ProcessIncomingEntryUpdate(std::shared_ptr msg, + NetworkConnection* conn) { + std::unique_lock lock(m_mutex); + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore arbitrary entry updates; + // this can happen due to deleted entries + lock.unlock(); + DEBUG("received update to unknown entry"); + return; + } + Entry* entry = m_idmap[id]; + + // ignore if sequence number not higher than local + SequenceNumber seq_num(msg->seq_num_uid()); + if (seq_num <= entry->seq_num) return; + + // update local + entry->value = msg->value(); + entry->seq_num = seq_num; + + // update persistent dirty flag if it's a persistent value + if (entry->IsPersistent()) m_persistent_dirty = true; + + // notify + m_notifier.NotifyEntry(entry->local_id, entry->name, entry->value, + NT_NOTIFY_UPDATE); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server && m_dispatcher) { + auto dispatcher = m_dispatcher; + lock.unlock(); + dispatcher->QueueOutgoing(msg, nullptr, conn); + } +} + +void Storage::ProcessIncomingFlagsUpdate(std::shared_ptr msg, + NetworkConnection* conn) { + std::unique_lock lock(m_mutex); + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore arbitrary entry updates; + // this can happen due to deleted entries + lock.unlock(); + DEBUG("received flags update to unknown entry"); + return; + } + + // update local + SetEntryFlagsImpl(m_idmap[id], msg->flags(), lock, false); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server && m_dispatcher) { + auto dispatcher = m_dispatcher; + lock.unlock(); + dispatcher->QueueOutgoing(msg, nullptr, conn); + } +} + +void Storage::ProcessIncomingEntryDelete(std::shared_ptr msg, + NetworkConnection* conn) { + std::unique_lock lock(m_mutex); + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore arbitrary entry updates; + // this can happen due to deleted entries + lock.unlock(); + DEBUG("received delete to unknown entry"); + return; + } + + // update local + DeleteEntryImpl(m_idmap[id], m_entries.end(), lock, false); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server && m_dispatcher) { + auto dispatcher = m_dispatcher; + lock.unlock(); + dispatcher->QueueOutgoing(msg, nullptr, conn); + } +} + +void Storage::ProcessIncomingClearEntries(std::shared_ptr msg, + NetworkConnection* conn) { + std::unique_lock lock(m_mutex); + // update local + DeleteAllEntriesImpl(false); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server && m_dispatcher) { + auto dispatcher = m_dispatcher; + lock.unlock(); + dispatcher->QueueOutgoing(msg, nullptr, conn); + } +} + +void Storage::ProcessIncomingExecuteRpc( + std::shared_ptr msg, NetworkConnection* conn, + std::weak_ptr conn_weak) { + std::unique_lock lock(m_mutex); + if (!m_server) return; // only process on server + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore call to non-existent RPC + // this can happen due to deleted entries + lock.unlock(); + DEBUG("received RPC call to unknown entry"); + return; + } + Entry* entry = m_idmap[id]; + if (!entry->value || !entry->value->IsRpc()) { + lock.unlock(); + DEBUG("received RPC call to non-RPC entry"); + return; + } + ConnectionInfo conn_info; + auto c = conn_weak.lock(); + if (c) { + conn_info = c->info(); + } else { + conn_info.remote_id = ""; + conn_info.remote_ip = ""; + conn_info.remote_port = 0; + conn_info.last_update = 0; + conn_info.protocol_version = 0; + } + unsigned int call_uid = msg->seq_num_uid(); + m_rpc_server.ProcessRpc( + entry->local_id, call_uid, entry->name, msg->str(), conn_info, + [=](StringRef result) { + auto c = conn_weak.lock(); + if (c) c->QueueOutgoing(Message::RpcResponse(id, call_uid, result)); + }, + entry->rpc_uid); +} + +void Storage::ProcessIncomingRpcResponse(std::shared_ptr msg, + NetworkConnection* conn) { + std::unique_lock lock(m_mutex); + if (m_server) return; // only process on client + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore response to non-existent RPC + // this can happen due to deleted entries + lock.unlock(); + DEBUG("received rpc response to unknown entry"); + return; + } + Entry* entry = m_idmap[id]; + if (!entry->value || !entry->value->IsRpc()) { + lock.unlock(); + DEBUG("received RPC response to non-RPC entry"); + return; + } + m_rpc_results.insert(std::make_pair( + RpcIdPair{entry->local_id, msg->seq_num_uid()}, msg->str())); + m_rpc_results_cond.notify_all(); +} + void Storage::GetInitialAssignments( NetworkConnection& conn, std::vector>* msgs) { std::lock_guard lock(m_mutex); diff --git a/src/main/native/cpp/Storage.h b/src/main/native/cpp/Storage.h index af7318ad9d..1a5538f314 100644 --- a/src/main/native/cpp/Storage.h +++ b/src/main/native/cpp/Storage.h @@ -192,6 +192,22 @@ class Storage : public IStorage { IRpcServer& m_rpc_server; wpi::Logger& m_logger; + void ProcessIncomingEntryAssign(std::shared_ptr msg, + NetworkConnection* conn); + void ProcessIncomingEntryUpdate(std::shared_ptr msg, + NetworkConnection* conn); + void ProcessIncomingFlagsUpdate(std::shared_ptr msg, + NetworkConnection* conn); + void ProcessIncomingEntryDelete(std::shared_ptr msg, + NetworkConnection* conn); + void ProcessIncomingClearEntries(std::shared_ptr msg, + NetworkConnection* conn); + void ProcessIncomingExecuteRpc(std::shared_ptr msg, + NetworkConnection* conn, + std::weak_ptr conn_weak); + void ProcessIncomingRpcResponse(std::shared_ptr msg, + NetworkConnection* conn); + bool GetPersistentEntries( bool periodic, std::vector>>* entries)