From 424bf51a7b9b8de3b5171cc6f5550e8d8d2b7ca3 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Wed, 23 Sep 2015 00:56:08 -0700 Subject: [PATCH] Implement local notification. The default behavior is to only notify remote changes, but for some applications (e.g. GUI's) it's advantageous to know about local changes as well. This is (slightly) optimized in that local changes only result in additional resources being consumed if (any) local listeners have been created. --- include/networktables/NetworkTable.h | 5 +++ include/ntcore_c.h | 2 +- include/ntcore_cpp.h | 2 +- include/tables/ITable.h | 34 +++++++++++++++++++ java/lib/NetworkTablesJNI.cpp | 5 +-- .../wpilibj/networktables/NetworkTable.java | 31 +++++++++++++---- .../networktables/NetworkTablesJNI.java | 2 +- .../edu/wpi/first/wpilibj/tables/ITable.java | 29 ++++++++++++++++ src/Notifier.cpp | 22 +++++++----- src/Notifier.h | 27 ++++++++++++--- src/Storage.cpp | 18 ++++++---- src/networktables/NetworkTable.cpp | 29 ++++++++++++---- src/ntcore_c.cpp | 5 +-- src/ntcore_cpp.cpp | 4 +-- 14 files changed, 173 insertions(+), 42 deletions(-) diff --git a/include/networktables/NetworkTable.h b/include/networktables/NetworkTable.h index fbaf906dea..d3515ad5a4 100644 --- a/include/networktables/NetworkTable.h +++ b/include/networktables/NetworkTable.h @@ -137,9 +137,14 @@ class NetworkTable : public ITable { void AddTableListener(ITableListener* listener); void AddTableListener(ITableListener* listener, bool immediateNotify); + void AddTableListener(ITableListener* listener, bool immediateNotify, + bool localNotify); void AddTableListener(llvm::StringRef key, ITableListener* listener, bool immediateNotify); + void AddTableListener(llvm::StringRef key, ITableListener* listener, + bool immediateNotify, bool localNotify); void AddSubTableListener(ITableListener* listener); + void AddSubTableListener(ITableListener* listener, bool localNotify); void RemoveTableListener(ITableListener* listener); /** diff --git a/include/ntcore_c.h b/include/ntcore_c.h index 5b35f10265..eecff87ccf 100644 --- a/include/ntcore_c.h +++ b/include/ntcore_c.h @@ -266,7 +266,7 @@ typedef void (*NT_ConnectionListenerCallback)( unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len, void *data, NT_EntryListenerCallback callback, - int immediate_notify); + int immediate_notify, int local_notify); void NT_RemoveEntryListener(unsigned int entry_listener_uid); unsigned int NT_AddConnectionListener(void *data, NT_ConnectionListenerCallback callback, diff --git a/include/ntcore_cpp.h b/include/ntcore_cpp.h index 88929124c3..302150bdac 100644 --- a/include/ntcore_cpp.h +++ b/include/ntcore_cpp.h @@ -189,7 +189,7 @@ typedef std::functionDetachCurrentThread(); }, - immediateNotify != JNI_FALSE); + immediateNotify != JNI_FALSE, + localNotify != JNI_FALSE); } /* diff --git a/java/src/edu/wpi/first/wpilibj/networktables/NetworkTable.java b/java/src/edu/wpi/first/wpilibj/networktables/NetworkTable.java index 26659afb5f..c342d614e3 100644 --- a/java/src/edu/wpi/first/wpilibj/networktables/NetworkTable.java +++ b/java/src/edu/wpi/first/wpilibj/networktables/NetworkTable.java @@ -240,7 +240,12 @@ public class NetworkTable implements ITable, IRemote { } public void addTableListener(ITableListener listener) { - addTableListener(listener, false); + addTableListener(listener, false, false); + } + + public void addTableListener(ITableListener listener, + boolean immediateNotify) { + addTableListener(listener, immediateNotify, false); } private class TableListenerAdapter extends ListenerBase implements NetworkTablesJNI.EntryListenerFunction { @@ -264,7 +269,8 @@ public class NetworkTable implements ITable, IRemote { private final Hashtable> listenerMap = new Hashtable>(); public synchronized void addTableListener(ITableListener listener, - boolean immediateNotify) { + boolean immediateNotify, + boolean localNotify) { List adapters = listenerMap.get(listener); if (adapters == null) { adapters = new ArrayList(); @@ -272,10 +278,15 @@ public class NetworkTable implements ITable, IRemote { } TableListenerAdapter adapter = new TableListenerAdapter(path.length() + 1, this, listener); - adapter.uid = NetworkTablesJNI.addEntryListener(path + PATH_SEPARATOR, adapter, immediateNotify); + adapter.uid = NetworkTablesJNI.addEntryListener(path + PATH_SEPARATOR, adapter, immediateNotify, localNotify); adapters.add(adapter); } + public void addTableListener(String key, ITableListener listener, + boolean immediateNotify) { + addTableListener(key, listener, immediateNotify, false); + } + private class KeyListenerAdapter extends ListenerBase implements NetworkTablesJNI.EntryListenerFunction { private final String relativeKey; private final String fullKey; @@ -297,7 +308,8 @@ public class NetworkTable implements ITable, IRemote { } public synchronized void addTableListener(String key, ITableListener listener, - boolean immediateNotify) { + boolean immediateNotify, + boolean localNotify) { List adapters = listenerMap.get(listener); if (adapters == null) { adapters = new ArrayList(); @@ -306,10 +318,14 @@ public class NetworkTable implements ITable, IRemote { String fullKey = path + PATH_SEPARATOR + key; KeyListenerAdapter adapter = new KeyListenerAdapter(key, fullKey, this, listener); - adapter.uid = NetworkTablesJNI.addEntryListener(fullKey, adapter, immediateNotify); + adapter.uid = NetworkTablesJNI.addEntryListener(fullKey, adapter, immediateNotify, localNotify); adapters.add(adapter); } + public void addSubTableListener(final ITableListener listener) { + addSubTableListener(listener, false); + } + private class SubListenerAdapter extends ListenerBase implements NetworkTablesJNI.EntryListenerFunction { private final int prefixLen; private final ITable targetSource; @@ -335,7 +351,8 @@ public class NetworkTable implements ITable, IRemote { } } - public synchronized void addSubTableListener(final ITableListener listener) { + public synchronized void addSubTableListener(final ITableListener listener, + boolean localNotify) { List adapters = listenerMap.get(listener); if (adapters == null) { adapters = new ArrayList(); @@ -343,7 +360,7 @@ public class NetworkTable implements ITable, IRemote { } SubListenerAdapter adapter = new SubListenerAdapter(path.length() + 1, this, listener); - adapter.uid = NetworkTablesJNI.addEntryListener(path + PATH_SEPARATOR, adapter, true); + adapter.uid = NetworkTablesJNI.addEntryListener(path + PATH_SEPARATOR, adapter, true, localNotify); adapters.add(adapter); } diff --git a/java/src/edu/wpi/first/wpilibj/networktables/NetworkTablesJNI.java b/java/src/edu/wpi/first/wpilibj/networktables/NetworkTablesJNI.java index c27c1fbad0..d7064f56f4 100644 --- a/java/src/edu/wpi/first/wpilibj/networktables/NetworkTablesJNI.java +++ b/java/src/edu/wpi/first/wpilibj/networktables/NetworkTablesJNI.java @@ -109,7 +109,7 @@ public class NetworkTablesJNI { public interface EntryListenerFunction { void apply(int uid, String key, Object value, boolean isNew); } - public static native int addEntryListener(String prefix, EntryListenerFunction listener, boolean immediateNotify); + public static native int addEntryListener(String prefix, EntryListenerFunction listener, boolean immediateNotify, boolean localNotify); public static native void removeEntryListener(int entryListenerUid); public interface ConnectionListenerFunction { diff --git a/java/src/edu/wpi/first/wpilibj/tables/ITable.java b/java/src/edu/wpi/first/wpilibj/tables/ITable.java index 62b37d30ef..2c2534f3d1 100644 --- a/java/src/edu/wpi/first/wpilibj/tables/ITable.java +++ b/java/src/edu/wpi/first/wpilibj/tables/ITable.java @@ -310,6 +310,16 @@ public interface ITable { */ public void addTableListener(ITableListener listener, boolean immediateNotify); + /** + * Add a listener for changes to the table + * @param listener the listener to add + * @param immediateNotify if true then this listener will be notified of all + * current entries (marked as new) + * @param localNotify if true then this listener will be notified of all + * local changes in addition to all remote changes + */ + public void addTableListener(ITableListener listener, boolean immediateNotify, + boolean localNotify); /** * Add a listener for changes to a specific key the table @@ -320,11 +330,30 @@ public interface ITable { */ public void addTableListener(String key, ITableListener listener, boolean immediateNotify); + /** + * Add a listener for changes to a specific key the table + * @param key the key to listen for + * @param listener the listener to add + * @param immediateNotify if true then this listener will be notified of all + * current entries (marked as new) + * @param localNotify if true then this listener will be notified of all + * local changes in addition to all remote changes + */ + public void addTableListener(String key, ITableListener listener, + boolean immediateNotify, boolean localNotify); /** * This will immediately notify the listener of all current sub tables * @param listener */ public void addSubTableListener(final ITableListener listener); + /** + * This will immediately notify the listener of all current sub tables + * @param listener + * @param localNotify if true then this listener will be notified of all + * local changes in addition to all remote changes + */ + public void addSubTableListener(final ITableListener listener, + boolean localNotify); /** * Remove a listener from receiving table events * @param listener the listener to be removed diff --git a/src/Notifier.cpp b/src/Notifier.cpp index 23f746d35f..634c8e2b52 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -14,6 +14,7 @@ bool Notifier::s_destroyed = false; Notifier::Notifier() { m_active = false; + m_local_notifiers = false; s_destroyed = false; } @@ -64,9 +65,10 @@ void Notifier::ThreadMain() { // Use index because iterator might get invalidated. for (std::size_t i=0; i lock(m_mutex); unsigned int uid = m_entry_listeners.size(); - m_entry_listeners.emplace_back(prefix, callback); + m_entry_listeners.emplace_back(prefix, callback, local_notify); + if (local_notify) m_local_notifiers = true; return uid + 1; } @@ -112,14 +116,16 @@ void Notifier::RemoveEntryListener(unsigned int entry_listener_uid) { --entry_listener_uid; std::lock_guard lock(m_mutex); if (entry_listener_uid < m_entry_listeners.size()) - m_entry_listeners[entry_listener_uid].second = nullptr; + m_entry_listeners[entry_listener_uid].callback = nullptr; } void Notifier::NotifyEntry(StringRef name, std::shared_ptr value, - bool is_new, EntryListenerCallback only) { + bool is_new, bool is_local, + EntryListenerCallback only) { if (!m_active) return; + if (is_local && !m_local_notifiers) return; // optimization std::unique_lock lock(m_mutex); - m_entry_notifications.emplace(name, value, is_new, only); + m_entry_notifications.emplace(name, value, is_new, is_local, only); lock.unlock(); m_cond.notify_one(); } diff --git a/src/Notifier.h b/src/Notifier.h index daa9309501..6a8f7bfca3 100644 --- a/src/Notifier.h +++ b/src/Notifier.h @@ -37,11 +37,12 @@ class Notifier { static bool destroyed() { return s_destroyed; } unsigned int AddEntryListener(StringRef prefix, - EntryListenerCallback callback); + EntryListenerCallback callback, + bool local_notify); void RemoveEntryListener(unsigned int entry_listener_uid); void NotifyEntry(StringRef name, std::shared_ptr value, bool is_new, - EntryListenerCallback only = nullptr); + bool is_local, EntryListenerCallback only = nullptr); unsigned int AddConnectionListener(ConnectionListenerCallback callback); void RemoveConnectionListener(unsigned int conn_listener_uid); @@ -55,20 +56,36 @@ class Notifier { void ThreadMain(); std::atomic_bool m_active; + std::atomic_bool m_local_notifiers; std::mutex m_mutex; std::condition_variable m_cond; - std::vector> m_entry_listeners; + + struct EntryListener { + EntryListener(StringRef prefix_, EntryListenerCallback callback_, + bool local_notify_) + : prefix(prefix_), callback(callback_), local_notify(local_notify_) {} + + std::string prefix; + EntryListenerCallback callback; + bool local_notify; + }; + std::vector m_entry_listeners; std::vector m_conn_listeners; struct EntryNotification { EntryNotification(StringRef name_, std::shared_ptr value_, - bool is_new_, EntryListenerCallback only_) - : name(name_), value(value_), is_new(is_new_), only(only_) {} + bool is_new_, bool is_local_, EntryListenerCallback only_) + : name(name_), + value(value_), + is_new(is_new_), + is_local(is_local_), + only(only_) {} std::string name; std::shared_ptr value; bool is_new; + bool is_local; EntryListenerCallback only; }; std::queue m_entry_notifications; diff --git a/src/Storage.cpp b/src/Storage.cpp index eadf0e1571..c36de1a267 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -93,7 +93,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, if (entry->IsPersistent()) m_persistent_dirty = true; // notify - m_notifier.NotifyEntry(name, entry->value, true); + m_notifier.NotifyEntry(name, entry->value, true, false); // send the assignment to everyone (including the originator) if (m_queue_outgoing) { @@ -135,7 +135,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, m_idmap[id] = new_entry.get(); // notify - m_notifier.NotifyEntry(name, new_entry->value, true); + m_notifier.NotifyEntry(name, new_entry->value, true, false); return; } may_need_update = true; // we may need to send an update message @@ -184,7 +184,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, entry->seq_num = seq_num; // notify - m_notifier.NotifyEntry(name, entry->value, false); + m_notifier.NotifyEntry(name, entry->value, false, false); // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) @@ -221,7 +221,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, if (entry->IsPersistent()) m_persistent_dirty = true; // notify - m_notifier.NotifyEntry(entry->name, entry->value, false); + m_notifier.NotifyEntry(entry->name, entry->value, false, false); // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) @@ -390,7 +390,7 @@ void Storage::ApplyInitialAssignments( entry->flags = msg->flags(); entry->seq_num = seq_num; // notify - m_notifier.NotifyEntry(name, entry->value, true); + m_notifier.NotifyEntry(name, entry->value, true, false); } else { // if reconnect and sequence number not higher than local, then we // don't update the local value and instead send it back to the server @@ -404,7 +404,7 @@ void Storage::ApplyInitialAssignments( // don't update flags from a <3.0 remote (not part of message) if (conn.proto_rev() >= 0x0300) entry->flags = msg->flags(); // notify - m_notifier.NotifyEntry(name, entry->value, false); + m_notifier.NotifyEntry(name, entry->value, false, false); } } @@ -463,6 +463,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr value) { value, entry->flags); lock.unlock(); queue_outgoing(msg, nullptr, nullptr); + m_notifier.NotifyEntry(name, value, true, true); } else if (*old_value != *value) { ++entry->seq_num; // don't send an update if we don't have an assigned id yet @@ -472,6 +473,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr value) { lock.unlock(); queue_outgoing(msg, nullptr, nullptr); } + m_notifier.NotifyEntry(name, value, false, true); } return true; } @@ -506,6 +508,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr value) { value, entry->flags); lock.unlock(); queue_outgoing(msg, nullptr, nullptr); + m_notifier.NotifyEntry(name, value, true, true); } else { ++entry->seq_num; // don't send an update if we don't have an assigned id yet @@ -515,6 +518,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr value) { lock.unlock(); queue_outgoing(msg, nullptr, nullptr); } + m_notifier.NotifyEntry(name, value, false, true); } } @@ -616,7 +620,7 @@ void Storage::NotifyEntries(StringRef prefix, std::lock_guard lock(m_mutex); for (auto& i : m_entries) { if (!i.getKey().startswith(prefix)) continue; - m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, only); + m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, false, only); } } diff --git a/src/networktables/NetworkTable.cpp b/src/networktables/NetworkTable.cpp index 33bbb32f0d..07b6e982c9 100644 --- a/src/networktables/NetworkTable.cpp +++ b/src/networktables/NetworkTable.cpp @@ -97,11 +97,16 @@ NetworkTable::~NetworkTable() { } void NetworkTable::AddTableListener(ITableListener* listener) { - AddTableListener(listener, false); + AddTableListener(listener, false, false); } void NetworkTable::AddTableListener(ITableListener* listener, bool immediateNotify) { + AddTableListener(listener, immediateNotify, false); +} + +void NetworkTable::AddTableListener(ITableListener* listener, + bool immediateNotify, bool localNotify) { std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; @@ -114,13 +119,18 @@ void NetworkTable::AddTableListener(ITableListener* listener, if (relative_key.find(PATH_SEPARATOR_CHAR) != StringRef::npos) return; listener->ValueChanged(this, relative_key, value, is_new); }, - immediateNotify); + immediateNotify, + localNotify); m_listeners.emplace_back(listener, id); } -void NetworkTable::AddTableListener(StringRef key, - ITableListener* listener, +void NetworkTable::AddTableListener(StringRef key, ITableListener* listener, bool immediateNotify) { + AddTableListener(key, listener, immediateNotify, false); +} + +void NetworkTable::AddTableListener(StringRef key, ITableListener* listener, + bool immediateNotify, bool localNotify) { std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; @@ -133,11 +143,17 @@ void NetworkTable::AddTableListener(StringRef key, if (name != path) return; listener->ValueChanged(this, name.substr(prefix_len), value, is_new); }, - immediateNotify); + immediateNotify, + localNotify); m_listeners.emplace_back(listener, id); } void NetworkTable::AddSubTableListener(ITableListener* listener) { + AddSubTableListener(listener, false); +} + +void NetworkTable::AddSubTableListener(ITableListener* listener, + bool localNotify) { std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; @@ -160,7 +176,8 @@ void NetworkTable::AddSubTableListener(ITableListener* listener) { notified_tables->insert(std::make_pair(sub_table_key, '\0')); listener->ValueChanged(this, sub_table_key, nullptr, true); }, - true); + true, + localNotify); m_listeners.emplace_back(listener, id); } diff --git a/src/ntcore_c.cpp b/src/ntcore_c.cpp index 266fabfd9c..e519a2ed85 100644 --- a/src/ntcore_c.cpp +++ b/src/ntcore_c.cpp @@ -169,14 +169,15 @@ void NT_Flush(void) { nt::Flush(); } unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len, void *data, NT_EntryListenerCallback callback, - int immediate_notify) { + int immediate_notify, int local_notify) { return nt::AddEntryListener( StringRef(prefix, prefix_len), [=](unsigned int uid, StringRef name, std::shared_ptr value, bool is_new) { callback(uid, data, name.data(), name.size(), &value->value(), is_new); }, - immediate_notify != 0); + immediate_notify != 0, + local_notify != 0); } void NT_RemoveEntryListener(unsigned int entry_listener_uid) { diff --git a/src/ntcore_cpp.cpp b/src/ntcore_cpp.cpp index 960151d597..917b5aceb5 100644 --- a/src/ntcore_cpp.cpp +++ b/src/ntcore_cpp.cpp @@ -66,9 +66,9 @@ void Flush() { */ unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback, - bool immediate_notify) { + bool immediate_notify, bool local_notify) { Notifier& notifier = Notifier::GetInstance(); - unsigned int uid = notifier.AddEntryListener(prefix, callback); + unsigned int uid = notifier.AddEntryListener(prefix, callback, local_notify); notifier.Start(); if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix, callback); return uid;