diff --git a/include/ntcore_c.h b/include/ntcore_c.h index 3dff25e7af..d2113138f8 100644 --- a/include/ntcore_c.h +++ b/include/ntcore_c.h @@ -269,7 +269,8 @@ unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len, int immediate_notify); void NT_RemoveEntryListener(unsigned int entry_listener_uid); unsigned int NT_AddConnectionListener(void *data, - NT_ConnectionListenerCallback callback); + NT_ConnectionListenerCallback callback, + int immediate_notify); void NT_RemoveConnectionListener(unsigned int conn_listener_uid); /* diff --git a/include/ntcore_cpp.h b/include/ntcore_cpp.h index 3cbe2ed3a6..5a15ae6659 100644 --- a/include/ntcore_cpp.h +++ b/include/ntcore_cpp.h @@ -191,7 +191,8 @@ typedef std::function DispatcherBase::GetConnections() const { return conns; } +void DispatcherBase::NotifyConnections( + ConnectionListenerCallback callback) const { + std::lock_guard lock(m_user_mutex); + for (auto& conn : m_connections) { + if (conn->state() != NetworkConnection::kActive) continue; + m_notifier.NotifyConnection(true, conn->info(), callback); + } +} + void DispatcherBase::DispatchThreadMain() { auto timeout_time = std::chrono::steady_clock::now(); diff --git a/src/Dispatcher.h b/src/Dispatcher.h index fba1bc9dec..372ec57c8d 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -42,6 +42,7 @@ class DispatcherBase { void SetIdentity(llvm::StringRef name); void Flush(); std::vector GetConnections() const; + void NotifyConnections(ConnectionListenerCallback callback) const; bool active() const { return m_active; } diff --git a/src/Notifier.cpp b/src/Notifier.cpp index 8e277746fd..752fbf9731 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -49,6 +49,14 @@ void Notifier::ThreadMain() { if (!item.value) continue; StringRef name(item.name); + if (item.only) { + // Don't hold mutex during callback execution! + lock.unlock(); + item.only(0, name, item.value, item.is_new); + lock.lock(); + continue; + } + // Use index because iterator might get invalidated. for (std::size_t i=0; i lock(m_mutex); unsigned int uid = m_entry_listeners.size(); m_entry_listeners.emplace_back(prefix, callback); - return uid; + return uid + 1; } void Notifier::RemoveEntryListener(unsigned int entry_listener_uid) { + --entry_listener_uid; std::lock_guard lock(m_mutex); if (entry_listener_uid < m_entry_listeners.size()) m_entry_listeners[entry_listener_uid].second = nullptr; } void Notifier::NotifyEntry(StringRef name, std::shared_ptr value, - bool is_new) { + bool is_new, EntryListenerCallback only) { if (!m_active) return; std::unique_lock lock(m_mutex); - m_entry_notifications.emplace(name, value, is_new); + m_entry_notifications.emplace(name, value, is_new, only); lock.unlock(); m_cond.notify_one(); } @@ -107,20 +124,22 @@ unsigned int Notifier::AddConnectionListener( std::lock_guard lock(m_mutex); unsigned int uid = m_entry_listeners.size(); m_conn_listeners.emplace_back(callback); - return uid; + return uid + 1; } void Notifier::RemoveConnectionListener(unsigned int conn_listener_uid) { + --conn_listener_uid; std::lock_guard lock(m_mutex); if (conn_listener_uid < m_conn_listeners.size()) m_conn_listeners[conn_listener_uid] = nullptr; } void Notifier::NotifyConnection(bool connected, - const ConnectionInfo& conn_info) { + const ConnectionInfo& conn_info, + ConnectionListenerCallback only) { if (!m_active) return; std::unique_lock lock(m_mutex); - m_conn_notifications.emplace(connected, conn_info); + m_conn_notifications.emplace(connected, conn_info, only); lock.unlock(); m_cond.notify_one(); } diff --git a/src/Notifier.h b/src/Notifier.h index 1121a3256c..763681e3f1 100644 --- a/src/Notifier.h +++ b/src/Notifier.h @@ -39,12 +39,14 @@ class Notifier { EntryListenerCallback callback); void RemoveEntryListener(unsigned int entry_listener_uid); - void NotifyEntry(StringRef name, std::shared_ptr value, bool is_new); + void NotifyEntry(StringRef name, std::shared_ptr value, bool is_new, + EntryListenerCallback only = nullptr); unsigned int AddConnectionListener(ConnectionListenerCallback callback); void RemoveConnectionListener(unsigned int conn_listener_uid); - void NotifyConnection(bool connected, const ConnectionInfo& conn_info); + void NotifyConnection(bool connected, const ConnectionInfo& conn_info, + ConnectionListenerCallback only = nullptr); private: Notifier(); @@ -60,21 +62,24 @@ class Notifier { struct EntryNotification { EntryNotification(StringRef name_, std::shared_ptr value_, - bool is_new_) - : name(name_), value(value_), is_new(is_new_) {} + bool is_new_, EntryListenerCallback only_) + : name(name_), value(value_), is_new(is_new_), only(only_) {} std::string name; std::shared_ptr value; bool is_new; + EntryListenerCallback only; }; std::queue m_entry_notifications; struct ConnectionNotification { - ConnectionNotification(bool connected_, const ConnectionInfo& conn_info_) - : connected(connected_), conn_info(conn_info_) {} + ConnectionNotification(bool connected_, const ConnectionInfo& conn_info_, + ConnectionListenerCallback only_) + : connected(connected_), conn_info(conn_info_), only(only_) {} bool connected; ConnectionInfo conn_info; + ConnectionListenerCallback only; }; std::queue m_conn_notifications; diff --git a/src/Storage.cpp b/src/Storage.cpp index df1f612be3..309e051de5 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -606,11 +606,12 @@ std::vector Storage::GetEntryInfo(StringRef prefix, return infos; } -void Storage::NotifyEntries(StringRef prefix) { +void Storage::NotifyEntries(StringRef prefix, + EntryListenerCallback only) const { std::lock_guard lock(m_mutex); for (auto& i : m_entries) { if (!i.getKey().startswith(prefix)) continue; - m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false); + m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, only); } } diff --git a/src/Storage.h b/src/Storage.h index a5901106f5..c87a37b842 100644 --- a/src/Storage.h +++ b/src/Storage.h @@ -74,7 +74,8 @@ class Storage { void DeleteEntry(StringRef name); void DeleteAllEntries(); std::vector GetEntryInfo(StringRef prefix, unsigned int types); - void NotifyEntries(StringRef prefix); + void NotifyEntries(StringRef prefix, + EntryListenerCallback only = nullptr) const; // Filename-based save/load functions. Used both by periodic saves and // accessible directly via the user API. diff --git a/src/ntcore_c.cpp b/src/ntcore_c.cpp index c5d8453aaf..334513dc2a 100644 --- a/src/ntcore_c.cpp +++ b/src/ntcore_c.cpp @@ -180,14 +180,16 @@ void NT_RemoveEntryListener(unsigned int entry_listener_uid) { } unsigned int NT_AddConnectionListener(void *data, - NT_ConnectionListenerCallback callback) { + NT_ConnectionListenerCallback callback, + int immediate_notify) { return nt::AddConnectionListener( [=](unsigned int uid, bool connected, const ConnectionInfo &conn) { NT_ConnectionInfo conn_c; ConvertToC(conn, &conn_c); callback(uid, data, connected ? 1 : 0, &conn_c); DisposeConnectionInfo(&conn_c); - }); + }, + immediate_notify != 0); } void NT_RemoveConnectionListener(unsigned int conn_listener_uid) { diff --git a/src/ntcore_cpp.cpp b/src/ntcore_cpp.cpp index 4bd25472dc..14a7cd60c2 100644 --- a/src/ntcore_cpp.cpp +++ b/src/ntcore_cpp.cpp @@ -70,7 +70,7 @@ unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback, Notifier& notifier = Notifier::GetInstance(); unsigned int uid = notifier.AddEntryListener(prefix, callback); notifier.Start(); - if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix); + if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix, callback); return uid; } @@ -78,10 +78,12 @@ void RemoveEntryListener(unsigned int entry_listener_uid) { Notifier::GetInstance().RemoveEntryListener(entry_listener_uid); } -unsigned int AddConnectionListener(ConnectionListenerCallback callback) { +unsigned int AddConnectionListener(ConnectionListenerCallback callback, + bool immediate_notify) { Notifier& notifier = Notifier::GetInstance(); unsigned int uid = notifier.AddConnectionListener(callback); Notifier::GetInstance().Start(); + if (immediate_notify) Dispatcher::GetInstance().NotifyConnections(callback); return uid; }