Implement local notification.

The default behavior is to only notify remote changes, but for some
applications (e.g. GUI's) it's advantageous to know about local
changes as well.

This is (slightly) optimized in that local changes only result in
additional resources being consumed if (any) local listeners have been
created.
This commit is contained in:
Peter Johnson
2015-09-23 00:56:08 -07:00
parent 75358e6c45
commit 424bf51a7b
14 changed files with 173 additions and 42 deletions

View File

@@ -14,6 +14,7 @@ bool Notifier::s_destroyed = false;
Notifier::Notifier() {
m_active = false;
m_local_notifiers = false;
s_destroyed = false;
}
@@ -64,9 +65,10 @@ void Notifier::ThreadMain() {
// Use index because iterator might get invalidated.
for (std::size_t i=0; i<m_entry_listeners.size(); ++i) {
if (!m_entry_listeners[i].second) continue; // removed
if (!name.startswith(m_entry_listeners[i].first)) continue;
auto callback = m_entry_listeners[i].second;
if (!m_entry_listeners[i].callback) continue; // removed
if (item.is_local && !m_entry_listeners[i].local_notify) continue;
if (!name.startswith(m_entry_listeners[i].prefix)) continue;
auto callback = m_entry_listeners[i].callback;
// Don't hold mutex during callback execution!
lock.unlock();
callback(i+1, name, item.value, item.is_new);
@@ -101,10 +103,12 @@ void Notifier::ThreadMain() {
}
unsigned int Notifier::AddEntryListener(StringRef prefix,
EntryListenerCallback callback) {
EntryListenerCallback callback,
bool local_notify) {
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_entry_listeners.size();
m_entry_listeners.emplace_back(prefix, callback);
m_entry_listeners.emplace_back(prefix, callback, local_notify);
if (local_notify) m_local_notifiers = true;
return uid + 1;
}
@@ -112,14 +116,16 @@ void Notifier::RemoveEntryListener(unsigned int entry_listener_uid) {
--entry_listener_uid;
std::lock_guard<std::mutex> lock(m_mutex);
if (entry_listener_uid < m_entry_listeners.size())
m_entry_listeners[entry_listener_uid].second = nullptr;
m_entry_listeners[entry_listener_uid].callback = nullptr;
}
void Notifier::NotifyEntry(StringRef name, std::shared_ptr<Value> value,
bool is_new, EntryListenerCallback only) {
bool is_new, bool is_local,
EntryListenerCallback only) {
if (!m_active) return;
if (is_local && !m_local_notifiers) return; // optimization
std::unique_lock<std::mutex> lock(m_mutex);
m_entry_notifications.emplace(name, value, is_new, only);
m_entry_notifications.emplace(name, value, is_new, is_local, only);
lock.unlock();
m_cond.notify_one();
}

View File

@@ -37,11 +37,12 @@ class Notifier {
static bool destroyed() { return s_destroyed; }
unsigned int AddEntryListener(StringRef prefix,
EntryListenerCallback callback);
EntryListenerCallback callback,
bool local_notify);
void RemoveEntryListener(unsigned int entry_listener_uid);
void NotifyEntry(StringRef name, std::shared_ptr<Value> value, bool is_new,
EntryListenerCallback only = nullptr);
bool is_local, EntryListenerCallback only = nullptr);
unsigned int AddConnectionListener(ConnectionListenerCallback callback);
void RemoveConnectionListener(unsigned int conn_listener_uid);
@@ -55,20 +56,36 @@ class Notifier {
void ThreadMain();
std::atomic_bool m_active;
std::atomic_bool m_local_notifiers;
std::mutex m_mutex;
std::condition_variable m_cond;
std::vector<std::pair<std::string, EntryListenerCallback>> m_entry_listeners;
struct EntryListener {
EntryListener(StringRef prefix_, EntryListenerCallback callback_,
bool local_notify_)
: prefix(prefix_), callback(callback_), local_notify(local_notify_) {}
std::string prefix;
EntryListenerCallback callback;
bool local_notify;
};
std::vector<EntryListener> m_entry_listeners;
std::vector<ConnectionListenerCallback> m_conn_listeners;
struct EntryNotification {
EntryNotification(StringRef name_, std::shared_ptr<Value> value_,
bool is_new_, EntryListenerCallback only_)
: name(name_), value(value_), is_new(is_new_), only(only_) {}
bool is_new_, bool is_local_, EntryListenerCallback only_)
: name(name_),
value(value_),
is_new(is_new_),
is_local(is_local_),
only(only_) {}
std::string name;
std::shared_ptr<Value> value;
bool is_new;
bool is_local;
EntryListenerCallback only;
};
std::queue<EntryNotification> m_entry_notifications;

View File

@@ -93,7 +93,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
if (entry->IsPersistent()) m_persistent_dirty = true;
// notify
m_notifier.NotifyEntry(name, entry->value, true);
m_notifier.NotifyEntry(name, entry->value, true, false);
// send the assignment to everyone (including the originator)
if (m_queue_outgoing) {
@@ -135,7 +135,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
m_idmap[id] = new_entry.get();
// notify
m_notifier.NotifyEntry(name, new_entry->value, true);
m_notifier.NotifyEntry(name, new_entry->value, true, false);
return;
}
may_need_update = true; // we may need to send an update message
@@ -184,7 +184,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
entry->seq_num = seq_num;
// notify
m_notifier.NotifyEntry(name, entry->value, false);
m_notifier.NotifyEntry(name, entry->value, false, false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
@@ -221,7 +221,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
if (entry->IsPersistent()) m_persistent_dirty = true;
// notify
m_notifier.NotifyEntry(entry->name, entry->value, false);
m_notifier.NotifyEntry(entry->name, entry->value, false, false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
@@ -390,7 +390,7 @@ void Storage::ApplyInitialAssignments(
entry->flags = msg->flags();
entry->seq_num = seq_num;
// notify
m_notifier.NotifyEntry(name, entry->value, true);
m_notifier.NotifyEntry(name, entry->value, true, false);
} else {
// if reconnect and sequence number not higher than local, then we
// don't update the local value and instead send it back to the server
@@ -404,7 +404,7 @@ void Storage::ApplyInitialAssignments(
// don't update flags from a <3.0 remote (not part of message)
if (conn.proto_rev() >= 0x0300) entry->flags = msg->flags();
// notify
m_notifier.NotifyEntry(name, entry->value, false);
m_notifier.NotifyEntry(name, entry->value, false, false);
}
}
@@ -463,6 +463,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
value, entry->flags);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
m_notifier.NotifyEntry(name, value, true, true);
} else if (*old_value != *value) {
++entry->seq_num;
// don't send an update if we don't have an assigned id yet
@@ -472,6 +473,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
m_notifier.NotifyEntry(name, value, false, true);
}
return true;
}
@@ -506,6 +508,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) {
value, entry->flags);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
m_notifier.NotifyEntry(name, value, true, true);
} else {
++entry->seq_num;
// don't send an update if we don't have an assigned id yet
@@ -515,6 +518,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) {
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
m_notifier.NotifyEntry(name, value, false, true);
}
}
@@ -616,7 +620,7 @@ void Storage::NotifyEntries(StringRef prefix,
std::lock_guard<std::mutex> lock(m_mutex);
for (auto& i : m_entries) {
if (!i.getKey().startswith(prefix)) continue;
m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, only);
m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, false, only);
}
}

View File

@@ -97,11 +97,16 @@ NetworkTable::~NetworkTable() {
}
void NetworkTable::AddTableListener(ITableListener* listener) {
AddTableListener(listener, false);
AddTableListener(listener, false, false);
}
void NetworkTable::AddTableListener(ITableListener* listener,
bool immediateNotify) {
AddTableListener(listener, immediateNotify, false);
}
void NetworkTable::AddTableListener(ITableListener* listener,
bool immediateNotify, bool localNotify) {
std::lock_guard<std::mutex> lock(m_mutex);
llvm::SmallString<128> path(m_path);
path += PATH_SEPARATOR_CHAR;
@@ -114,13 +119,18 @@ void NetworkTable::AddTableListener(ITableListener* listener,
if (relative_key.find(PATH_SEPARATOR_CHAR) != StringRef::npos) return;
listener->ValueChanged(this, relative_key, value, is_new);
},
immediateNotify);
immediateNotify,
localNotify);
m_listeners.emplace_back(listener, id);
}
void NetworkTable::AddTableListener(StringRef key,
ITableListener* listener,
void NetworkTable::AddTableListener(StringRef key, ITableListener* listener,
bool immediateNotify) {
AddTableListener(key, listener, immediateNotify, false);
}
void NetworkTable::AddTableListener(StringRef key, ITableListener* listener,
bool immediateNotify, bool localNotify) {
std::lock_guard<std::mutex> lock(m_mutex);
llvm::SmallString<128> path(m_path);
path += PATH_SEPARATOR_CHAR;
@@ -133,11 +143,17 @@ void NetworkTable::AddTableListener(StringRef key,
if (name != path) return;
listener->ValueChanged(this, name.substr(prefix_len), value, is_new);
},
immediateNotify);
immediateNotify,
localNotify);
m_listeners.emplace_back(listener, id);
}
void NetworkTable::AddSubTableListener(ITableListener* listener) {
AddSubTableListener(listener, false);
}
void NetworkTable::AddSubTableListener(ITableListener* listener,
bool localNotify) {
std::lock_guard<std::mutex> lock(m_mutex);
llvm::SmallString<128> path(m_path);
path += PATH_SEPARATOR_CHAR;
@@ -160,7 +176,8 @@ void NetworkTable::AddSubTableListener(ITableListener* listener) {
notified_tables->insert(std::make_pair(sub_table_key, '\0'));
listener->ValueChanged(this, sub_table_key, nullptr, true);
},
true);
true,
localNotify);
m_listeners.emplace_back(listener, id);
}

View File

@@ -169,14 +169,15 @@ void NT_Flush(void) { nt::Flush(); }
unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len,
void *data,
NT_EntryListenerCallback callback,
int immediate_notify) {
int immediate_notify, int local_notify) {
return nt::AddEntryListener(
StringRef(prefix, prefix_len),
[=](unsigned int uid, StringRef name, std::shared_ptr<Value> value,
bool is_new) {
callback(uid, data, name.data(), name.size(), &value->value(), is_new);
},
immediate_notify != 0);
immediate_notify != 0,
local_notify != 0);
}
void NT_RemoveEntryListener(unsigned int entry_listener_uid) {

View File

@@ -66,9 +66,9 @@ void Flush() {
*/
unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback,
bool immediate_notify) {
bool immediate_notify, bool local_notify) {
Notifier& notifier = Notifier::GetInstance();
unsigned int uid = notifier.AddEntryListener(prefix, callback);
unsigned int uid = notifier.AddEntryListener(prefix, callback, local_notify);
notifier.Start();
if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix, callback);
return uid;