Storage: generate messages rather than intermediate updates.

This commit is contained in:
Peter Johnson
2015-07-31 13:21:19 -07:00
parent a127bca0e4
commit 138ebf5b4d
4 changed files with 528 additions and 435 deletions

View File

@@ -19,9 +19,7 @@ using namespace nt;
ATOMIC_STATIC_INIT(Storage)
Storage::Storage() {
m_updates_delete_all = false;
}
Storage::Storage() {}
Storage::~Storage() {}
@@ -60,6 +58,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
unsigned int id = msg->id();
StringRef name = msg->str();
std::shared_ptr<StorageEntry> entry;
bool may_need_update = false;
if (m_server) {
// if we're a server, id=0xffff requests are requests for an id
// to be assigned, and we need to send the new assignment back to
@@ -79,12 +78,13 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
m_idmap.push_back(entry);
// send the assignment to everyone (including the originator)
lock.unlock();
if (m_queue_outgoing)
m_queue_outgoing(
Message::EntryAssign(name, id, entry->seq_num().value(),
msg->value(), msg->flags()),
nullptr, nullptr);
if (m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
auto outmsg = Message::EntryAssign(
name, id, entry->seq_num().value(), msg->value(), msg->flags());
lock.unlock();
queue_outgoing(outmsg, nullptr, nullptr);
}
return;
}
if (id >= m_idmap.size() || !m_idmap[id]) {
@@ -107,10 +107,15 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
if (!entry) {
// create local
auto& new_entry = m_entries[name];
if (!new_entry) new_entry = std::make_shared<StorageEntry>(name);
if (!new_entry) {
// didn't exist at all (rather than just being a response to a
// id assignment request)
new_entry = std::make_shared<StorageEntry>(name);
new_entry->set_value(msg->value());
new_entry->set_flags(msg->flags());
} else
may_need_update = true; // we may need to send an update message
entry = new_entry;
entry->set_value(msg->value());
entry->set_flags(msg->flags());
entry->set_id(id);
m_idmap[id] = entry;
return;
@@ -121,7 +126,16 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// already exists; ignore if sequence number not higher than local
SequenceNumber seq_num(msg->seq_num_uid());
if (seq_num <= entry->seq_num()) return;
if (seq_num <= entry->seq_num()) {
if (may_need_update) {
auto queue_outgoing = m_queue_outgoing;
auto msg = Message::EntryUpdate(entry->id(), entry->seq_num().value(),
entry->value());
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
return;
}
// sanity check: name should match id
if (msg->str() != entry->name()) {
@@ -139,12 +153,14 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
lock.unlock();
if (m_server && m_queue_outgoing)
m_queue_outgoing(
if (m_server && m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
auto outmsg =
Message::EntryAssign(entry->name(), id, msg->seq_num_uid(),
msg->value(), entry->flags()),
nullptr, conn);
msg->value(), entry->flags());
lock.unlock();
queue_outgoing(outmsg, nullptr, conn);
}
break;
}
case Message::kEntryUpdate: {
@@ -168,8 +184,11 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
lock.unlock();
if (m_server && m_queue_outgoing) m_queue_outgoing(msg, nullptr, conn);
if (m_server && m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
queue_outgoing(msg, nullptr, conn);
}
break;
}
case Message::kFlagsUpdate: {
@@ -188,8 +207,11 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
lock.unlock();
if (m_server && m_queue_outgoing) m_queue_outgoing(msg, nullptr, conn);
if (m_server && m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
queue_outgoing(msg, nullptr, conn);
}
break;
}
case Message::kEntryDelete: {
@@ -209,8 +231,11 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
lock.unlock();
if (m_server && m_queue_outgoing) m_queue_outgoing(msg, nullptr, conn);
if (m_server && m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
queue_outgoing(msg, nullptr, conn);
}
break;
}
case Message::kClearEntries: {
@@ -220,8 +245,11 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
lock.unlock();
if (m_server && m_queue_outgoing) m_queue_outgoing(msg, nullptr, conn);
if (m_server && m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
queue_outgoing(msg, nullptr, conn);
}
break;
}
case Message::kExecuteRpc:
@@ -233,149 +261,165 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
}
}
void Storage::GetUpdates(UpdateMap* updates, bool* delete_all) {
std::lock_guard<std::mutex> lock(m_mutex);
m_updates.swap(*updates);
m_updates.clear();
*delete_all = m_updates_delete_all;
m_updates_delete_all = false;
}
// Must be called with mutex already held
void Storage::AddUpdate(std::shared_ptr<StorageEntry> entry,
Update::Kind kind) {
if (kind == Update::kDeleteAll) {
m_updates_delete_all = true;
m_updates.clear();
return;
void Storage::SendAssignments(
std::function<void(std::shared_ptr<Message>)> send_msg, bool reset_ids) {
std::vector<std::shared_ptr<Message>> msgs;
{
std::lock_guard<std::mutex> lock(m_mutex);
for (auto& i : m_entries) {
auto entry = i.getValue();
msgs.emplace_back(Message::EntryAssign(i.getKey(), entry->id(),
entry->seq_num().value(),
entry->value(), entry->flags()));
if (!m_server && reset_ids) entry->set_id(0xffff);
}
if (!m_server && reset_ids) m_idmap.resize(0);
}
auto& update = m_updates[entry->name()];
switch (kind) {
case Update::kAssign:
update.entry = entry;
update.value = entry->value();
update.flags = entry->flags();
update.kind = Update::kAssign;
break;
case Update::kValueUpdate:
if (update.kind == Update::kNone) {
update.entry = entry;
update.kind = Update::kValueUpdate;
} else if (update.kind == Update::kFlagsUpdate) {
// Merge value+flags updates as we can only have a single update
// entry.
update.kind = Update::kValueFlagsUpdate;
} else if (update.kind == Update::kDelete) {
// Change delete + update into assign
update.kind = Update::kAssign;
update.flags = entry->flags();
}
update.value = entry->value();
break;
case Update::kFlagsUpdate:
if (update.kind == Update::kNone) {
update.entry = entry;
update.kind = Update::kFlagsUpdate;
} else if (update.kind == Update::kValueUpdate) {
// Merge value+flags updates as we can only have a single update
// entry.
update.kind = Update::kValueFlagsUpdate;
} else if (update.kind == Update::kDelete) {
return; // can't do a raw flags update
}
update.flags = entry->flags();
break;
case Update::kDelete:
update.kind = Update::kDelete;
update.entry = entry;
update.value = nullptr; // release reference if any
break;
default:
assert(false && "unexpected update kind");
break;
}
}
std::shared_ptr<StorageEntry> Storage::FindEntry(StringRef name) const {
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
return i == m_entries.end() ? nullptr : i->getValue();
for (auto& msg : msgs) send_msg(std::move(msg));
}
std::shared_ptr<Value> Storage::GetEntryValue(StringRef name) const {
auto entry = FindEntry(name);
if (!entry) return nullptr;
return entry->value();
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
return i == m_entries.end() ? nullptr : i->getValue()->value();
}
bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
if (name.empty()) return true;
if (!value) return true;
std::lock_guard<std::mutex> lock(m_mutex);
auto& entry = m_entries[name];
if (!entry) entry = std::make_shared<StorageEntry>(name);
std::unique_lock<std::mutex> lock(m_mutex);
auto& new_entry = m_entries[name];
if (!new_entry) new_entry = std::make_shared<StorageEntry>(name);
auto entry = new_entry;
auto old_value = entry->value();
if (old_value && old_value->type() != value->type())
return false; // error on type mismatch
entry->set_value(value);
// put on update queue
if (!old_value)
AddUpdate(entry, Update::kAssign);
else if (*old_value != *value)
AddUpdate(entry, Update::kValueUpdate);
// if we're the server, assign an id if it doesn't have one
if (m_server && entry->id() == 0xffff) {
unsigned int id = m_idmap.size();
entry->set_id(id);
m_idmap.push_back(entry);
}
// generate message
if (!m_queue_outgoing) return true;
auto queue_outgoing = m_queue_outgoing;
if (!old_value) {
auto msg = Message::EntryAssign(name, entry->id(), entry->seq_num().value(),
value, entry->flags());
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
} else if (*old_value != *value) {
entry->seq_num_inc();
// don't send an update if we don't have an assigned id yet
if (entry->id() != 0xffff) {
auto msg =
Message::EntryUpdate(entry->id(), entry->seq_num().value(), value);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
}
return true;
}
void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) {
if (name.empty()) return;
if (!value) return;
std::lock_guard<std::mutex> lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
auto& entry = m_entries[name];
if (!entry) entry = std::make_shared<StorageEntry>(name);
auto old_value = entry->value();
entry->set_value(value);
if (!old_value || *old_value != *value) {
// put on update queue
if (!old_value || old_value->type() != value->type())
AddUpdate(entry, Update::kAssign);
else
AddUpdate(entry, Update::kValueUpdate);
if (old_value && *old_value == *value) return;
// if we're the server, assign an id if it doesn't have one
if (m_server && entry->id() == 0xffff) {
unsigned int id = m_idmap.size();
entry->set_id(id);
m_idmap.push_back(entry);
}
// generate message
if (!m_queue_outgoing) return;
auto queue_outgoing = m_queue_outgoing;
if (!old_value || old_value->type() != value->type()) {
entry->seq_num_inc();
auto msg = Message::EntryAssign(name, entry->id(), entry->seq_num().value(),
value, entry->flags());
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
} else {
entry->seq_num_inc();
// don't send an update if we don't have an assigned id yet
if (entry->id() != 0xffff) {
auto msg =
Message::EntryUpdate(entry->id(), entry->seq_num().value(), value);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
}
}
void Storage::SetEntryFlags(StringRef name, unsigned int flags) {
if (name.empty()) return;
std::lock_guard<std::mutex> lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
if (i == m_entries.end()) return;
auto entry = i->getValue();
if (entry->flags() != flags) {
entry->set_flags(flags);
AddUpdate(entry, Update::kFlagsUpdate); // put on update queue
if (entry->flags() == flags) return;
entry->set_flags(flags);
// generate message
if (!m_queue_outgoing) return;
auto queue_outgoing = m_queue_outgoing;
unsigned int id = entry->id();
// don't send an update if we don't have an assigned id yet
if (id != 0xffff) {
lock.unlock();
queue_outgoing(Message::FlagsUpdate(id, flags), nullptr, nullptr);
}
}
unsigned int Storage::GetEntryFlags(StringRef name) const {
auto entry = FindEntry(name);
if (!entry) return 0;
return entry->flags();
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
return i == m_entries.end() ? 0 : i->getValue()->flags();
}
void Storage::DeleteEntry(StringRef name) {
std::lock_guard<std::mutex> lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
if (i == m_entries.end()) return;
auto entry = i->getValue();
unsigned int id = entry->id();
m_entries.erase(i); // erase from map
// if it had a value, put on update queue
if (entry->value()) AddUpdate(entry, Update::kDelete);
if (id < m_idmap.size()) m_idmap[id].reset();
if (!entry->value()) return;
// if it had a value, generate message
// don't send an update if we don't have an assigned id yet
if (id != 0xffff) {
if (!m_queue_outgoing) return;
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
queue_outgoing(Message::EntryDelete(id), nullptr, nullptr);
}
}
void Storage::DeleteAllEntries() {
std::lock_guard<std::mutex> lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
if (m_entries.empty()) return;
m_entries.clear();
AddUpdate(nullptr, Update::kDeleteAll); // put on update queue
m_idmap.resize(0);
// generate message
if (!m_queue_outgoing) return;
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
queue_outgoing(Message::ClearEntries(), nullptr, nullptr);
}
std::vector<EntryInfo> Storage::GetEntryInfo(StringRef prefix,
@@ -821,7 +865,8 @@ next_line:
// copy values into storage as quickly as possible so lock isn't held
{
std::lock_guard<std::mutex> lock(m_mutex);
std::vector<std::shared_ptr<Message>> msgs;
std::unique_lock<std::mutex> lock(m_mutex);
for (auto& i : entries) {
auto& entry = m_entries[i.first];
if (!entry) entry = std::make_shared<StorageEntry>(i.first);
@@ -830,16 +875,36 @@ next_line:
bool was_persist = entry->IsPersistent();
if (!was_persist) entry->set_flags(entry->flags() | NT_PERSISTENT);
// if we're the server, assign an id if it doesn't have one
if (m_server && entry->id() == 0xffff) {
unsigned int id = m_idmap.size();
entry->set_id(id);
m_idmap.push_back(entry);
}
if (!m_queue_outgoing) continue; // shortcut
entry->seq_num_inc();
// put on update queue
if (!old_value || old_value->type() != i.second->type())
AddUpdate(entry, Update::kAssign);
else {
msgs.emplace_back(Message::EntryAssign(i.first, entry->id(),
entry->seq_num().value(),
i.second, entry->flags()));
else if (entry->id() != 0xffff) {
// don't send an update if we don't have an assigned id yet
if (*old_value != *i.second)
AddUpdate(entry, Update::kValueUpdate);
msgs.emplace_back(Message::EntryUpdate(
entry->id(), entry->seq_num().value(), i.second));
if (!was_persist)
AddUpdate(entry, Update::kFlagsUpdate);
msgs.emplace_back(Message::FlagsUpdate(entry->id(), entry->flags()));
}
}
if (m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
for (auto& msg : msgs) queue_outgoing(std::move(msg), nullptr, nullptr);
}
}
return true;