diff --git a/include/networktables/NetworkTable.h b/include/networktables/NetworkTable.h index fbaf906dea..d3515ad5a4 100644 --- a/include/networktables/NetworkTable.h +++ b/include/networktables/NetworkTable.h @@ -137,9 +137,14 @@ class NetworkTable : public ITable { void AddTableListener(ITableListener* listener); void AddTableListener(ITableListener* listener, bool immediateNotify); + void AddTableListener(ITableListener* listener, bool immediateNotify, + bool localNotify); void AddTableListener(llvm::StringRef key, ITableListener* listener, bool immediateNotify); + void AddTableListener(llvm::StringRef key, ITableListener* listener, + bool immediateNotify, bool localNotify); void AddSubTableListener(ITableListener* listener); + void AddSubTableListener(ITableListener* listener, bool localNotify); void RemoveTableListener(ITableListener* listener); /** diff --git a/include/ntcore_c.h b/include/ntcore_c.h index 5b35f10265..eecff87ccf 100644 --- a/include/ntcore_c.h +++ b/include/ntcore_c.h @@ -266,7 +266,7 @@ typedef void (*NT_ConnectionListenerCallback)( unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len, void *data, NT_EntryListenerCallback callback, - int immediate_notify); + int immediate_notify, int local_notify); void NT_RemoveEntryListener(unsigned int entry_listener_uid); unsigned int NT_AddConnectionListener(void *data, NT_ConnectionListenerCallback callback, diff --git a/include/ntcore_cpp.h b/include/ntcore_cpp.h index 88929124c3..302150bdac 100644 --- a/include/ntcore_cpp.h +++ b/include/ntcore_cpp.h @@ -189,7 +189,7 @@ typedef std::functionDetachCurrentThread(); }, - immediateNotify != JNI_FALSE); + immediateNotify != JNI_FALSE, + localNotify != JNI_FALSE); } /* diff --git a/java/src/edu/wpi/first/wpilibj/networktables/NetworkTable.java b/java/src/edu/wpi/first/wpilibj/networktables/NetworkTable.java index 26659afb5f..c342d614e3 100644 --- a/java/src/edu/wpi/first/wpilibj/networktables/NetworkTable.java +++ b/java/src/edu/wpi/first/wpilibj/networktables/NetworkTable.java @@ -240,7 +240,12 @@ public class NetworkTable implements ITable, IRemote { } public void addTableListener(ITableListener listener) { - addTableListener(listener, false); + addTableListener(listener, false, false); + } + + public void addTableListener(ITableListener listener, + boolean immediateNotify) { + addTableListener(listener, immediateNotify, false); } private class TableListenerAdapter extends ListenerBase implements NetworkTablesJNI.EntryListenerFunction { @@ -264,7 +269,8 @@ public class NetworkTable implements ITable, IRemote { private final Hashtable> listenerMap = new Hashtable>(); public synchronized void addTableListener(ITableListener listener, - boolean immediateNotify) { + boolean immediateNotify, + boolean localNotify) { List adapters = listenerMap.get(listener); if (adapters == null) { adapters = new ArrayList(); @@ -272,10 +278,15 @@ public class NetworkTable implements ITable, IRemote { } TableListenerAdapter adapter = new TableListenerAdapter(path.length() + 1, this, listener); - adapter.uid = NetworkTablesJNI.addEntryListener(path + PATH_SEPARATOR, adapter, immediateNotify); + adapter.uid = NetworkTablesJNI.addEntryListener(path + PATH_SEPARATOR, adapter, immediateNotify, localNotify); adapters.add(adapter); } + public void addTableListener(String key, ITableListener listener, + boolean immediateNotify) { + addTableListener(key, listener, immediateNotify, false); + } + private class KeyListenerAdapter extends ListenerBase implements NetworkTablesJNI.EntryListenerFunction { private final String relativeKey; private final String fullKey; @@ -297,7 +308,8 @@ public class NetworkTable implements ITable, IRemote { } public synchronized void addTableListener(String key, ITableListener listener, - boolean immediateNotify) { + boolean immediateNotify, + boolean localNotify) { List adapters = listenerMap.get(listener); if (adapters == null) { adapters = new ArrayList(); @@ -306,10 +318,14 @@ public class NetworkTable implements ITable, IRemote { String fullKey = path + PATH_SEPARATOR + key; KeyListenerAdapter adapter = new KeyListenerAdapter(key, fullKey, this, listener); - adapter.uid = NetworkTablesJNI.addEntryListener(fullKey, adapter, immediateNotify); + adapter.uid = NetworkTablesJNI.addEntryListener(fullKey, adapter, immediateNotify, localNotify); adapters.add(adapter); } + public void addSubTableListener(final ITableListener listener) { + addSubTableListener(listener, false); + } + private class SubListenerAdapter extends ListenerBase implements NetworkTablesJNI.EntryListenerFunction { private final int prefixLen; private final ITable targetSource; @@ -335,7 +351,8 @@ public class NetworkTable implements ITable, IRemote { } } - public synchronized void addSubTableListener(final ITableListener listener) { + public synchronized void addSubTableListener(final ITableListener listener, + boolean localNotify) { List adapters = listenerMap.get(listener); if (adapters == null) { adapters = new ArrayList(); @@ -343,7 +360,7 @@ public class NetworkTable implements ITable, IRemote { } SubListenerAdapter adapter = new SubListenerAdapter(path.length() + 1, this, listener); - adapter.uid = NetworkTablesJNI.addEntryListener(path + PATH_SEPARATOR, adapter, true); + adapter.uid = NetworkTablesJNI.addEntryListener(path + PATH_SEPARATOR, adapter, true, localNotify); adapters.add(adapter); } diff --git a/java/src/edu/wpi/first/wpilibj/networktables/NetworkTablesJNI.java b/java/src/edu/wpi/first/wpilibj/networktables/NetworkTablesJNI.java index c27c1fbad0..d7064f56f4 100644 --- a/java/src/edu/wpi/first/wpilibj/networktables/NetworkTablesJNI.java +++ b/java/src/edu/wpi/first/wpilibj/networktables/NetworkTablesJNI.java @@ -109,7 +109,7 @@ public class NetworkTablesJNI { public interface EntryListenerFunction { void apply(int uid, String key, Object value, boolean isNew); } - public static native int addEntryListener(String prefix, EntryListenerFunction listener, boolean immediateNotify); + public static native int addEntryListener(String prefix, EntryListenerFunction listener, boolean immediateNotify, boolean localNotify); public static native void removeEntryListener(int entryListenerUid); public interface ConnectionListenerFunction { diff --git a/java/src/edu/wpi/first/wpilibj/tables/ITable.java b/java/src/edu/wpi/first/wpilibj/tables/ITable.java index 62b37d30ef..2c2534f3d1 100644 --- a/java/src/edu/wpi/first/wpilibj/tables/ITable.java +++ b/java/src/edu/wpi/first/wpilibj/tables/ITable.java @@ -310,6 +310,16 @@ public interface ITable { */ public void addTableListener(ITableListener listener, boolean immediateNotify); + /** + * Add a listener for changes to the table + * @param listener the listener to add + * @param immediateNotify if true then this listener will be notified of all + * current entries (marked as new) + * @param localNotify if true then this listener will be notified of all + * local changes in addition to all remote changes + */ + public void addTableListener(ITableListener listener, boolean immediateNotify, + boolean localNotify); /** * Add a listener for changes to a specific key the table @@ -320,11 +330,30 @@ public interface ITable { */ public void addTableListener(String key, ITableListener listener, boolean immediateNotify); + /** + * Add a listener for changes to a specific key the table + * @param key the key to listen for + * @param listener the listener to add + * @param immediateNotify if true then this listener will be notified of all + * current entries (marked as new) + * @param localNotify if true then this listener will be notified of all + * local changes in addition to all remote changes + */ + public void addTableListener(String key, ITableListener listener, + boolean immediateNotify, boolean localNotify); /** * This will immediately notify the listener of all current sub tables * @param listener */ public void addSubTableListener(final ITableListener listener); + /** + * This will immediately notify the listener of all current sub tables + * @param listener + * @param localNotify if true then this listener will be notified of all + * local changes in addition to all remote changes + */ + public void addSubTableListener(final ITableListener listener, + boolean localNotify); /** * Remove a listener from receiving table events * @param listener the listener to be removed diff --git a/src/Notifier.cpp b/src/Notifier.cpp index 23f746d35f..634c8e2b52 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -14,6 +14,7 @@ bool Notifier::s_destroyed = false; Notifier::Notifier() { m_active = false; + m_local_notifiers = false; s_destroyed = false; } @@ -64,9 +65,10 @@ void Notifier::ThreadMain() { // Use index because iterator might get invalidated. for (std::size_t i=0; i lock(m_mutex); unsigned int uid = m_entry_listeners.size(); - m_entry_listeners.emplace_back(prefix, callback); + m_entry_listeners.emplace_back(prefix, callback, local_notify); + if (local_notify) m_local_notifiers = true; return uid + 1; } @@ -112,14 +116,16 @@ void Notifier::RemoveEntryListener(unsigned int entry_listener_uid) { --entry_listener_uid; std::lock_guard lock(m_mutex); if (entry_listener_uid < m_entry_listeners.size()) - m_entry_listeners[entry_listener_uid].second = nullptr; + m_entry_listeners[entry_listener_uid].callback = nullptr; } void Notifier::NotifyEntry(StringRef name, std::shared_ptr value, - bool is_new, EntryListenerCallback only) { + bool is_new, bool is_local, + EntryListenerCallback only) { if (!m_active) return; + if (is_local && !m_local_notifiers) return; // optimization std::unique_lock lock(m_mutex); - m_entry_notifications.emplace(name, value, is_new, only); + m_entry_notifications.emplace(name, value, is_new, is_local, only); lock.unlock(); m_cond.notify_one(); } diff --git a/src/Notifier.h b/src/Notifier.h index daa9309501..6a8f7bfca3 100644 --- a/src/Notifier.h +++ b/src/Notifier.h @@ -37,11 +37,12 @@ class Notifier { static bool destroyed() { return s_destroyed; } unsigned int AddEntryListener(StringRef prefix, - EntryListenerCallback callback); + EntryListenerCallback callback, + bool local_notify); void RemoveEntryListener(unsigned int entry_listener_uid); void NotifyEntry(StringRef name, std::shared_ptr value, bool is_new, - EntryListenerCallback only = nullptr); + bool is_local, EntryListenerCallback only = nullptr); unsigned int AddConnectionListener(ConnectionListenerCallback callback); void RemoveConnectionListener(unsigned int conn_listener_uid); @@ -55,20 +56,36 @@ class Notifier { void ThreadMain(); std::atomic_bool m_active; + std::atomic_bool m_local_notifiers; std::mutex m_mutex; std::condition_variable m_cond; - std::vector> m_entry_listeners; + + struct EntryListener { + EntryListener(StringRef prefix_, EntryListenerCallback callback_, + bool local_notify_) + : prefix(prefix_), callback(callback_), local_notify(local_notify_) {} + + std::string prefix; + EntryListenerCallback callback; + bool local_notify; + }; + std::vector m_entry_listeners; std::vector m_conn_listeners; struct EntryNotification { EntryNotification(StringRef name_, std::shared_ptr value_, - bool is_new_, EntryListenerCallback only_) - : name(name_), value(value_), is_new(is_new_), only(only_) {} + bool is_new_, bool is_local_, EntryListenerCallback only_) + : name(name_), + value(value_), + is_new(is_new_), + is_local(is_local_), + only(only_) {} std::string name; std::shared_ptr value; bool is_new; + bool is_local; EntryListenerCallback only; }; std::queue m_entry_notifications; diff --git a/src/Storage.cpp b/src/Storage.cpp index eadf0e1571..c36de1a267 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -93,7 +93,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, if (entry->IsPersistent()) m_persistent_dirty = true; // notify - m_notifier.NotifyEntry(name, entry->value, true); + m_notifier.NotifyEntry(name, entry->value, true, false); // send the assignment to everyone (including the originator) if (m_queue_outgoing) { @@ -135,7 +135,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, m_idmap[id] = new_entry.get(); // notify - m_notifier.NotifyEntry(name, new_entry->value, true); + m_notifier.NotifyEntry(name, new_entry->value, true, false); return; } may_need_update = true; // we may need to send an update message @@ -184,7 +184,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, entry->seq_num = seq_num; // notify - m_notifier.NotifyEntry(name, entry->value, false); + m_notifier.NotifyEntry(name, entry->value, false, false); // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) @@ -221,7 +221,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, if (entry->IsPersistent()) m_persistent_dirty = true; // notify - m_notifier.NotifyEntry(entry->name, entry->value, false); + m_notifier.NotifyEntry(entry->name, entry->value, false, false); // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) @@ -390,7 +390,7 @@ void Storage::ApplyInitialAssignments( entry->flags = msg->flags(); entry->seq_num = seq_num; // notify - m_notifier.NotifyEntry(name, entry->value, true); + m_notifier.NotifyEntry(name, entry->value, true, false); } else { // if reconnect and sequence number not higher than local, then we // don't update the local value and instead send it back to the server @@ -404,7 +404,7 @@ void Storage::ApplyInitialAssignments( // don't update flags from a <3.0 remote (not part of message) if (conn.proto_rev() >= 0x0300) entry->flags = msg->flags(); // notify - m_notifier.NotifyEntry(name, entry->value, false); + m_notifier.NotifyEntry(name, entry->value, false, false); } } @@ -463,6 +463,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr value) { value, entry->flags); lock.unlock(); queue_outgoing(msg, nullptr, nullptr); + m_notifier.NotifyEntry(name, value, true, true); } else if (*old_value != *value) { ++entry->seq_num; // don't send an update if we don't have an assigned id yet @@ -472,6 +473,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr value) { lock.unlock(); queue_outgoing(msg, nullptr, nullptr); } + m_notifier.NotifyEntry(name, value, false, true); } return true; } @@ -506,6 +508,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr value) { value, entry->flags); lock.unlock(); queue_outgoing(msg, nullptr, nullptr); + m_notifier.NotifyEntry(name, value, true, true); } else { ++entry->seq_num; // don't send an update if we don't have an assigned id yet @@ -515,6 +518,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr value) { lock.unlock(); queue_outgoing(msg, nullptr, nullptr); } + m_notifier.NotifyEntry(name, value, false, true); } } @@ -616,7 +620,7 @@ void Storage::NotifyEntries(StringRef prefix, std::lock_guard lock(m_mutex); for (auto& i : m_entries) { if (!i.getKey().startswith(prefix)) continue; - m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, only); + m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, false, only); } } diff --git a/src/networktables/NetworkTable.cpp b/src/networktables/NetworkTable.cpp index 33bbb32f0d..07b6e982c9 100644 --- a/src/networktables/NetworkTable.cpp +++ b/src/networktables/NetworkTable.cpp @@ -97,11 +97,16 @@ NetworkTable::~NetworkTable() { } void NetworkTable::AddTableListener(ITableListener* listener) { - AddTableListener(listener, false); + AddTableListener(listener, false, false); } void NetworkTable::AddTableListener(ITableListener* listener, bool immediateNotify) { + AddTableListener(listener, immediateNotify, false); +} + +void NetworkTable::AddTableListener(ITableListener* listener, + bool immediateNotify, bool localNotify) { std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; @@ -114,13 +119,18 @@ void NetworkTable::AddTableListener(ITableListener* listener, if (relative_key.find(PATH_SEPARATOR_CHAR) != StringRef::npos) return; listener->ValueChanged(this, relative_key, value, is_new); }, - immediateNotify); + immediateNotify, + localNotify); m_listeners.emplace_back(listener, id); } -void NetworkTable::AddTableListener(StringRef key, - ITableListener* listener, +void NetworkTable::AddTableListener(StringRef key, ITableListener* listener, bool immediateNotify) { + AddTableListener(key, listener, immediateNotify, false); +} + +void NetworkTable::AddTableListener(StringRef key, ITableListener* listener, + bool immediateNotify, bool localNotify) { std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; @@ -133,11 +143,17 @@ void NetworkTable::AddTableListener(StringRef key, if (name != path) return; listener->ValueChanged(this, name.substr(prefix_len), value, is_new); }, - immediateNotify); + immediateNotify, + localNotify); m_listeners.emplace_back(listener, id); } void NetworkTable::AddSubTableListener(ITableListener* listener) { + AddSubTableListener(listener, false); +} + +void NetworkTable::AddSubTableListener(ITableListener* listener, + bool localNotify) { std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; @@ -160,7 +176,8 @@ void NetworkTable::AddSubTableListener(ITableListener* listener) { notified_tables->insert(std::make_pair(sub_table_key, '\0')); listener->ValueChanged(this, sub_table_key, nullptr, true); }, - true); + true, + localNotify); m_listeners.emplace_back(listener, id); } diff --git a/src/ntcore_c.cpp b/src/ntcore_c.cpp index 266fabfd9c..e519a2ed85 100644 --- a/src/ntcore_c.cpp +++ b/src/ntcore_c.cpp @@ -169,14 +169,15 @@ void NT_Flush(void) { nt::Flush(); } unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len, void *data, NT_EntryListenerCallback callback, - int immediate_notify) { + int immediate_notify, int local_notify) { return nt::AddEntryListener( StringRef(prefix, prefix_len), [=](unsigned int uid, StringRef name, std::shared_ptr value, bool is_new) { callback(uid, data, name.data(), name.size(), &value->value(), is_new); }, - immediate_notify != 0); + immediate_notify != 0, + local_notify != 0); } void NT_RemoveEntryListener(unsigned int entry_listener_uid) { diff --git a/src/ntcore_cpp.cpp b/src/ntcore_cpp.cpp index 960151d597..917b5aceb5 100644 --- a/src/ntcore_cpp.cpp +++ b/src/ntcore_cpp.cpp @@ -66,9 +66,9 @@ void Flush() { */ unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback, - bool immediate_notify) { + bool immediate_notify, bool local_notify) { Notifier& notifier = Notifier::GetInstance(); - unsigned int uid = notifier.AddEntryListener(prefix, callback); + unsigned int uid = notifier.AddEntryListener(prefix, callback, local_notify); notifier.Start(); if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix, callback); return uid;