Provide more extensive listener features.

This enables listeners to be notified of not only value updates, but also flag
changes and deletions by using a bitmask to specify what notifications are
desired.  The old API (which only provided a new/not new) flag is still
supported.  This also subsumes the feature to listen to local changes (that's
one of the bitmask options).
This commit is contained in:
Peter Johnson
2015-09-25 11:54:17 -07:00
parent 51064f5e75
commit 90959defd9
17 changed files with 246 additions and 128 deletions

View File

@@ -93,7 +93,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
if (entry->IsPersistent()) m_persistent_dirty = true;
// notify
m_notifier.NotifyEntry(name, entry->value, true, false);
m_notifier.NotifyEntry(name, entry->value, NT_NOTIFY_NEW);
// send the assignment to everyone (including the originator)
if (m_queue_outgoing) {
@@ -135,7 +135,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
m_idmap[id] = new_entry.get();
// notify
m_notifier.NotifyEntry(name, new_entry->value, true, false);
m_notifier.NotifyEntry(name, new_entry->value, NT_NOTIFY_NEW);
return;
}
may_need_update = true; // we may need to send an update message
@@ -177,12 +177,16 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
return;
}
unsigned int notify_flags = NT_NOTIFY_UPDATE;
// don't update flags from a <3.0 remote (not part of message)
// don't update flags if this is a server response to a client id request
if (!may_need_update && conn->proto_rev() >= 0x0300) {
// update persistent dirty flag if persistent flag changed
if ((entry->flags & NT_PERSISTENT) != (msg->flags() & NT_PERSISTENT))
m_persistent_dirty = true;
if (entry->flags != msg->flags())
notify_flags |= NT_NOTIFY_FLAGS;
entry->flags = msg->flags();
}
@@ -195,7 +199,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
entry->seq_num = seq_num;
// notify
m_notifier.NotifyEntry(name, entry->value, false, false);
m_notifier.NotifyEntry(name, entry->value, notify_flags);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
@@ -232,7 +236,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
if (entry->IsPersistent()) m_persistent_dirty = true;
// notify
m_notifier.NotifyEntry(entry->name, entry->value, false, false);
m_notifier.NotifyEntry(entry->name, entry->value, NT_NOTIFY_UPDATE);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
@@ -254,6 +258,9 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
}
Entry* entry = m_idmap[id];
// ignore if flags didn't actually change
if (entry->flags == msg->flags()) return;
// update persistent dirty flag if persistent flag changed
if ((entry->flags & NT_PERSISTENT) != (msg->flags() & NT_PERSISTENT))
m_persistent_dirty = true;
@@ -261,6 +268,9 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// update local
entry->flags = msg->flags();
// notify
m_notifier.NotifyEntry(entry->name, entry->value, NT_NOTIFY_FLAGS);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_queue_outgoing) {
@@ -284,9 +294,19 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// update persistent dirty flag if it's a persistent value
if (entry->IsPersistent()) m_persistent_dirty = true;
// update local
m_entries.erase(entry->name); // erase from map
m_idmap[id] = nullptr; // delete it from idmap too
// delete it from idmap
m_idmap[id] = nullptr;
// get entry (as we'll need it for notify) and erase it from the map
// it should always be in the map, but sanity check just in case
auto i = m_entries.find(entry->name);
if (i != m_entries.end()) {
auto entry2 = std::move(i->getValue()); // move the value out
m_entries.erase(i);
// notify
m_notifier.NotifyEntry(entry2->name, entry2->value, NT_NOTIFY_DELETE);
}
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
@@ -299,12 +319,18 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
}
case Message::kClearEntries: {
// update local
m_entries.clear();
EntriesMap map;
m_entries.swap(map);
m_idmap.resize(0);
// set persistent dirty flag
m_persistent_dirty = true;
// notify
for (auto& entry : map)
m_notifier.NotifyEntry(entry.getKey(), entry.getValue()->value,
NT_NOTIFY_DELETE);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_queue_outgoing) {
@@ -401,7 +427,7 @@ void Storage::ApplyInitialAssignments(
entry->flags = msg->flags();
entry->seq_num = seq_num;
// notify
m_notifier.NotifyEntry(name, entry->value, true, false);
m_notifier.NotifyEntry(name, entry->value, NT_NOTIFY_NEW);
} else {
// if reconnect and sequence number not higher than local, then we
// don't update the local value and instead send it back to the server
@@ -412,10 +438,14 @@ void Storage::ApplyInitialAssignments(
} else {
entry->value = msg->value();
entry->seq_num = seq_num;
unsigned int notify_flags = NT_NOTIFY_UPDATE;
// don't update flags from a <3.0 remote (not part of message)
if (conn.proto_rev() >= 0x0300) entry->flags = msg->flags();
if (conn.proto_rev() >= 0x0300) {
if (entry->flags != msg->flags()) notify_flags |= NT_NOTIFY_FLAGS;
entry->flags = msg->flags();
}
// notify
m_notifier.NotifyEntry(name, entry->value, false, false);
m_notifier.NotifyEntry(name, entry->value, notify_flags);
}
}
@@ -474,7 +504,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
value, entry->flags);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
m_notifier.NotifyEntry(name, value, true, true);
m_notifier.NotifyEntry(name, value, NT_NOTIFY_NEW | NT_NOTIFY_LOCAL);
} else if (*old_value != *value) {
++entry->seq_num;
// don't send an update if we don't have an assigned id yet
@@ -484,7 +514,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
m_notifier.NotifyEntry(name, value, false, true);
m_notifier.NotifyEntry(name, value, NT_NOTIFY_UPDATE | NT_NOTIFY_LOCAL);
}
return true;
}
@@ -519,7 +549,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) {
value, entry->flags);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
m_notifier.NotifyEntry(name, value, true, true);
m_notifier.NotifyEntry(name, value, NT_NOTIFY_NEW | NT_NOTIFY_LOCAL);
} else {
++entry->seq_num;
// don't send an update if we don't have an assigned id yet
@@ -529,7 +559,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) {
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
m_notifier.NotifyEntry(name, value, false, true);
m_notifier.NotifyEntry(name, value, NT_NOTIFY_UPDATE | NT_NOTIFY_LOCAL);
}
}
@@ -556,6 +586,9 @@ void Storage::SetEntryFlags(StringRef name, unsigned int flags) {
lock.unlock();
queue_outgoing(Message::FlagsUpdate(id, flags), nullptr, nullptr);
}
// notify
m_notifier.NotifyEntry(name, entry->value, NT_NOTIFY_FLAGS | NT_NOTIFY_LOCAL);
}
unsigned int Storage::GetEntryFlags(StringRef name) const {
@@ -568,9 +601,8 @@ void Storage::DeleteEntry(StringRef name) {
std::unique_lock<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
if (i == m_entries.end()) return;
Entry* entry = i->getValue().get();
auto entry = std::move(i->getValue());
unsigned int id = entry->id;
bool had_value = entry->value != nullptr;
// update persistent dirty flag if it's a persistent value
if (entry->IsPersistent()) m_persistent_dirty = true;
@@ -578,7 +610,7 @@ void Storage::DeleteEntry(StringRef name) {
m_entries.erase(i); // erase from map
if (id < m_idmap.size()) m_idmap[id] = nullptr;
if (!had_value) return;
if (!entry->value) return;
// if it had a value, generate message
// don't send an update if we don't have an assigned id yet
@@ -588,12 +620,17 @@ void Storage::DeleteEntry(StringRef name) {
lock.unlock();
queue_outgoing(Message::EntryDelete(id), nullptr, nullptr);
}
// notify
m_notifier.NotifyEntry(name, entry->value,
NT_NOTIFY_DELETE | NT_NOTIFY_LOCAL);
}
void Storage::DeleteAllEntries() {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_entries.empty()) return;
m_entries.clear();
EntriesMap map;
m_entries.swap(map);
m_idmap.resize(0);
// set persistent dirty flag
@@ -604,6 +641,13 @@ void Storage::DeleteAllEntries() {
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
queue_outgoing(Message::ClearEntries(), nullptr, nullptr);
// notify
if (m_notifier.local_notifiers()) {
for (auto& entry : map)
m_notifier.NotifyEntry(entry.getKey(), entry.getValue()->value,
NT_NOTIFY_DELETE | NT_NOTIFY_LOCAL);
}
}
std::vector<EntryInfo> Storage::GetEntryInfo(StringRef prefix,
@@ -631,7 +675,8 @@ void Storage::NotifyEntries(StringRef prefix,
std::lock_guard<std::mutex> lock(m_mutex);
for (auto& i : m_entries) {
if (!i.getKey().startswith(prefix)) continue;
m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, false, only);
m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, NT_NOTIFY_IMMEDIATE,
only);
}
}