Refactor Storage ProcessIncoming().

This commit is contained in:
Peter Johnson
2017-08-13 08:20:28 -07:00
parent 8e01b68cf6
commit 8099d6dbd7
2 changed files with 317 additions and 266 deletions

View File

@@ -48,7 +48,6 @@ NT_Type Storage::GetMessageEntryType(unsigned int id) const {
void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
NetworkConnection* conn,
std::weak_ptr<NetworkConnection> conn_weak) {
std::unique_lock<std::mutex> lock(m_mutex);
switch (msg->type()) {
case Message::kKeepAlive:
break; // ignore
@@ -59,283 +58,319 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
case Message::kClientHelloDone:
// shouldn't get these, but ignore if we do
break;
case Message::kEntryAssign: {
unsigned int id = msg->id();
StringRef name = msg->str();
Entry* entry;
bool may_need_update = false;
if (m_server) {
// if we're a server, id=0xffff requests are requests for an id
// to be assigned, and we need to send the new assignment back to
// the sender as well as all other connections.
if (id == 0xffff) {
entry = GetOrNew(name);
// see if it was already assigned; ignore if so.
if (entry->id != 0xffff) return;
entry->flags = msg->flags();
SetEntryValueImpl(entry, msg->value(), lock, false);
return;
}
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore arbitrary entry assignments
// this can happen due to e.g. assignment to deleted entry
lock.unlock();
DEBUG("server: received assignment to unknown entry");
return;
}
entry = m_idmap[id];
} else {
// clients simply accept new assignments
if (id == 0xffff) {
lock.unlock();
DEBUG("client: received entry assignment request?");
return;
}
if (id >= m_idmap.size()) m_idmap.resize(id + 1);
entry = m_idmap[id];
if (!entry) {
// create local
bool is_new;
entry = GetOrNew(name, &is_new);
entry->id = id;
m_idmap[id] = entry;
if (is_new) {
// didn't exist at all (rather than just being a response to a
// id assignment request)
entry->value = msg->value();
entry->flags = msg->flags();
// notify
m_notifier.NotifyEntry(entry->local_id, name, entry->value,
NT_NOTIFY_NEW);
return;
}
may_need_update = true; // we may need to send an update message
// if the received flags don't match what we sent, we most likely
// updated flags locally in the interim; send flags update message.
if (msg->flags() != entry->flags) {
auto dispatcher = m_dispatcher;
auto outmsg = Message::FlagsUpdate(id, entry->flags);
lock.unlock();
dispatcher->QueueOutgoing(outmsg, nullptr, nullptr);
lock.lock();
}
}
}
// common client and server handling
// already exists; ignore if sequence number not higher than local
SequenceNumber seq_num(msg->seq_num_uid());
if (seq_num < entry->seq_num) {
if (may_need_update) {
auto dispatcher = m_dispatcher;
auto outmsg = Message::EntryUpdate(entry->id, entry->seq_num.value(),
entry->value);
lock.unlock();
dispatcher->QueueOutgoing(outmsg, nullptr, nullptr);
}
return;
}
// sanity check: name should match id
if (msg->str() != entry->name) {
lock.unlock();
DEBUG("entry assignment for same id with different name?");
return;
}
unsigned int notify_flags = NT_NOTIFY_UPDATE;
// don't update flags from a <3.0 remote (not part of message)
// don't update flags if this is a server response to a client id request
if (!may_need_update && conn->proto_rev() >= 0x0300) {
// update persistent dirty flag if persistent flag changed
if ((entry->flags & NT_PERSISTENT) != (msg->flags() & NT_PERSISTENT))
m_persistent_dirty = true;
if (entry->flags != msg->flags()) notify_flags |= NT_NOTIFY_FLAGS;
entry->flags = msg->flags();
}
// update persistent dirty flag if the value changed and it's persistent
if (entry->IsPersistent() && *entry->value != *msg->value())
m_persistent_dirty = true;
// update local
entry->value = msg->value();
entry->seq_num = seq_num;
// notify
m_notifier.NotifyEntry(entry->local_id, name, entry->value, notify_flags);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
auto outmsg = Message::EntryAssign(entry->name, id, msg->seq_num_uid(),
msg->value(), entry->flags);
lock.unlock();
dispatcher->QueueOutgoing(outmsg, nullptr, conn);
}
case Message::kEntryAssign:
ProcessIncomingEntryAssign(std::move(msg), conn);
break;
}
case Message::kEntryUpdate: {
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore arbitrary entry updates;
// this can happen due to deleted entries
lock.unlock();
DEBUG("received update to unknown entry");
return;
}
Entry* entry = m_idmap[id];
// ignore if sequence number not higher than local
SequenceNumber seq_num(msg->seq_num_uid());
if (seq_num <= entry->seq_num) return;
// update local
entry->value = msg->value();
entry->seq_num = seq_num;
// update persistent dirty flag if it's a persistent value
if (entry->IsPersistent()) m_persistent_dirty = true;
// notify
m_notifier.NotifyEntry(entry->local_id, entry->name, entry->value,
NT_NOTIFY_UPDATE);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
lock.unlock();
dispatcher->QueueOutgoing(msg, nullptr, conn);
}
case Message::kEntryUpdate:
ProcessIncomingEntryUpdate(std::move(msg), conn);
break;
}
case Message::kFlagsUpdate: {
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore arbitrary entry updates;
// this can happen due to deleted entries
lock.unlock();
DEBUG("received flags update to unknown entry");
return;
}
// update local
SetEntryFlagsImpl(m_idmap[id], msg->flags(), lock, false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
lock.unlock();
dispatcher->QueueOutgoing(msg, nullptr, conn);
}
case Message::kFlagsUpdate:
ProcessIncomingFlagsUpdate(std::move(msg), conn);
break;
}
case Message::kEntryDelete: {
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore arbitrary entry updates;
// this can happen due to deleted entries
lock.unlock();
DEBUG("received delete to unknown entry");
return;
}
// update local
DeleteEntryImpl(m_idmap[id], m_entries.end(), lock, false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
lock.unlock();
dispatcher->QueueOutgoing(msg, nullptr, conn);
}
case Message::kEntryDelete:
ProcessIncomingEntryDelete(std::move(msg), conn);
break;
}
case Message::kClearEntries: {
// update local
DeleteAllEntriesImpl(false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
lock.unlock();
dispatcher->QueueOutgoing(msg, nullptr, conn);
}
case Message::kClearEntries:
ProcessIncomingClearEntries(std::move(msg), conn);
break;
}
case Message::kExecuteRpc: {
if (!m_server) return; // only process on server
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore call to non-existent RPC
// this can happen due to deleted entries
lock.unlock();
DEBUG("received RPC call to unknown entry");
return;
}
Entry* entry = m_idmap[id];
if (!entry->value || !entry->value->IsRpc()) {
lock.unlock();
DEBUG("received RPC call to non-RPC entry");
return;
}
ConnectionInfo conn_info;
auto c = conn_weak.lock();
if (c) {
conn_info = c->info();
} else {
conn_info.remote_id = "";
conn_info.remote_ip = "";
conn_info.remote_port = 0;
conn_info.last_update = 0;
conn_info.protocol_version = 0;
}
unsigned int call_uid = msg->seq_num_uid();
m_rpc_server.ProcessRpc(
entry->local_id, call_uid, entry->name, msg->str(), conn_info,
[=](StringRef result) {
auto c = conn_weak.lock();
if (c) c->QueueOutgoing(Message::RpcResponse(id, call_uid, result));
},
entry->rpc_uid);
case Message::kExecuteRpc:
ProcessIncomingExecuteRpc(std::move(msg), conn, std::move(conn_weak));
break;
}
case Message::kRpcResponse: {
if (m_server) return; // only process on client
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore response to non-existent RPC
// this can happen due to deleted entries
lock.unlock();
DEBUG("received rpc response to unknown entry");
return;
}
Entry* entry = m_idmap[id];
if (!entry->value || !entry->value->IsRpc()) {
lock.unlock();
DEBUG("received RPC response to non-RPC entry");
return;
}
m_rpc_results.insert(std::make_pair(
RpcIdPair{entry->local_id, msg->seq_num_uid()}, msg->str()));
m_rpc_results_cond.notify_all();
case Message::kRpcResponse:
ProcessIncomingRpcResponse(std::move(msg), conn);
break;
}
default:
break;
}
}
void Storage::ProcessIncomingEntryAssign(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
unsigned int id = msg->id();
StringRef name = msg->str();
Entry* entry;
bool may_need_update = false;
if (m_server) {
// if we're a server, id=0xffff requests are requests for an id
// to be assigned, and we need to send the new assignment back to
// the sender as well as all other connections.
if (id == 0xffff) {
entry = GetOrNew(name);
// see if it was already assigned; ignore if so.
if (entry->id != 0xffff) return;
entry->flags = msg->flags();
SetEntryValueImpl(entry, msg->value(), lock, false);
return;
}
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore arbitrary entry assignments
// this can happen due to e.g. assignment to deleted entry
lock.unlock();
DEBUG("server: received assignment to unknown entry");
return;
}
entry = m_idmap[id];
} else {
// clients simply accept new assignments
if (id == 0xffff) {
lock.unlock();
DEBUG("client: received entry assignment request?");
return;
}
if (id >= m_idmap.size()) m_idmap.resize(id + 1);
entry = m_idmap[id];
if (!entry) {
// create local
bool is_new;
entry = GetOrNew(name, &is_new);
entry->id = id;
m_idmap[id] = entry;
if (is_new) {
// didn't exist at all (rather than just being a response to a
// id assignment request)
entry->value = msg->value();
entry->flags = msg->flags();
// notify
m_notifier.NotifyEntry(entry->local_id, name, entry->value,
NT_NOTIFY_NEW);
return;
}
may_need_update = true; // we may need to send an update message
// if the received flags don't match what we sent, we most likely
// updated flags locally in the interim; send flags update message.
if (msg->flags() != entry->flags) {
auto dispatcher = m_dispatcher;
auto outmsg = Message::FlagsUpdate(id, entry->flags);
lock.unlock();
dispatcher->QueueOutgoing(outmsg, nullptr, nullptr);
lock.lock();
}
}
}
// common client and server handling
// already exists; ignore if sequence number not higher than local
SequenceNumber seq_num(msg->seq_num_uid());
if (seq_num < entry->seq_num) {
if (may_need_update) {
auto dispatcher = m_dispatcher;
auto outmsg =
Message::EntryUpdate(entry->id, entry->seq_num.value(), entry->value);
lock.unlock();
dispatcher->QueueOutgoing(outmsg, nullptr, nullptr);
}
return;
}
// sanity check: name should match id
if (msg->str() != entry->name) {
lock.unlock();
DEBUG("entry assignment for same id with different name?");
return;
}
unsigned int notify_flags = NT_NOTIFY_UPDATE;
// don't update flags from a <3.0 remote (not part of message)
// don't update flags if this is a server response to a client id request
if (!may_need_update && conn->proto_rev() >= 0x0300) {
// update persistent dirty flag if persistent flag changed
if ((entry->flags & NT_PERSISTENT) != (msg->flags() & NT_PERSISTENT))
m_persistent_dirty = true;
if (entry->flags != msg->flags()) notify_flags |= NT_NOTIFY_FLAGS;
entry->flags = msg->flags();
}
// update persistent dirty flag if the value changed and it's persistent
if (entry->IsPersistent() && *entry->value != *msg->value())
m_persistent_dirty = true;
// update local
entry->value = msg->value();
entry->seq_num = seq_num;
// notify
m_notifier.NotifyEntry(entry->local_id, name, entry->value, notify_flags);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
auto outmsg = Message::EntryAssign(entry->name, id, msg->seq_num_uid(),
msg->value(), entry->flags);
lock.unlock();
dispatcher->QueueOutgoing(outmsg, nullptr, conn);
}
}
void Storage::ProcessIncomingEntryUpdate(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore arbitrary entry updates;
// this can happen due to deleted entries
lock.unlock();
DEBUG("received update to unknown entry");
return;
}
Entry* entry = m_idmap[id];
// ignore if sequence number not higher than local
SequenceNumber seq_num(msg->seq_num_uid());
if (seq_num <= entry->seq_num) return;
// update local
entry->value = msg->value();
entry->seq_num = seq_num;
// update persistent dirty flag if it's a persistent value
if (entry->IsPersistent()) m_persistent_dirty = true;
// notify
m_notifier.NotifyEntry(entry->local_id, entry->name, entry->value,
NT_NOTIFY_UPDATE);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
lock.unlock();
dispatcher->QueueOutgoing(msg, nullptr, conn);
}
}
void Storage::ProcessIncomingFlagsUpdate(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore arbitrary entry updates;
// this can happen due to deleted entries
lock.unlock();
DEBUG("received flags update to unknown entry");
return;
}
// update local
SetEntryFlagsImpl(m_idmap[id], msg->flags(), lock, false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
lock.unlock();
dispatcher->QueueOutgoing(msg, nullptr, conn);
}
}
void Storage::ProcessIncomingEntryDelete(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore arbitrary entry updates;
// this can happen due to deleted entries
lock.unlock();
DEBUG("received delete to unknown entry");
return;
}
// update local
DeleteEntryImpl(m_idmap[id], m_entries.end(), lock, false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
lock.unlock();
dispatcher->QueueOutgoing(msg, nullptr, conn);
}
}
void Storage::ProcessIncomingClearEntries(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
// update local
DeleteAllEntriesImpl(false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_dispatcher) {
auto dispatcher = m_dispatcher;
lock.unlock();
dispatcher->QueueOutgoing(msg, nullptr, conn);
}
}
void Storage::ProcessIncomingExecuteRpc(
std::shared_ptr<Message> msg, NetworkConnection* conn,
std::weak_ptr<NetworkConnection> conn_weak) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_server) return; // only process on server
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore call to non-existent RPC
// this can happen due to deleted entries
lock.unlock();
DEBUG("received RPC call to unknown entry");
return;
}
Entry* entry = m_idmap[id];
if (!entry->value || !entry->value->IsRpc()) {
lock.unlock();
DEBUG("received RPC call to non-RPC entry");
return;
}
ConnectionInfo conn_info;
auto c = conn_weak.lock();
if (c) {
conn_info = c->info();
} else {
conn_info.remote_id = "";
conn_info.remote_ip = "";
conn_info.remote_port = 0;
conn_info.last_update = 0;
conn_info.protocol_version = 0;
}
unsigned int call_uid = msg->seq_num_uid();
m_rpc_server.ProcessRpc(
entry->local_id, call_uid, entry->name, msg->str(), conn_info,
[=](StringRef result) {
auto c = conn_weak.lock();
if (c) c->QueueOutgoing(Message::RpcResponse(id, call_uid, result));
},
entry->rpc_uid);
}
void Storage::ProcessIncomingRpcResponse(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_server) return; // only process on client
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore response to non-existent RPC
// this can happen due to deleted entries
lock.unlock();
DEBUG("received rpc response to unknown entry");
return;
}
Entry* entry = m_idmap[id];
if (!entry->value || !entry->value->IsRpc()) {
lock.unlock();
DEBUG("received RPC response to non-RPC entry");
return;
}
m_rpc_results.insert(std::make_pair(
RpcIdPair{entry->local_id, msg->seq_num_uid()}, msg->str()));
m_rpc_results_cond.notify_all();
}
void Storage::GetInitialAssignments(
NetworkConnection& conn, std::vector<std::shared_ptr<Message>>* msgs) {
std::lock_guard<std::mutex> lock(m_mutex);