From e9073a3cc074f233f284bcf77e325855cddb22b3 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 2 Aug 2015 21:47:01 -0700 Subject: [PATCH] Implement notifiers. The notifier thread is lazily started when the first notifier is added. This avoids the extra thread/processing overhead when notifiers are not used. --- src/Dispatcher.cpp | 11 ++-- src/Dispatcher.h | 9 ++- src/NetworkConnection.cpp | 12 ++++ src/NetworkConnection.h | 7 +++ src/Notifier.cpp | 124 ++++++++++++++++++++++++++++++++++++++ src/Notifier.h | 88 +++++++++++++++++++++++++++ src/Storage.cpp | 30 +++++++-- src/Storage.h | 6 +- src/ntcore_cpp.cpp | 20 ++++-- 9 files changed, 289 insertions(+), 18 deletions(-) create mode 100644 src/Notifier.cpp create mode 100644 src/Notifier.h diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 85e6e1195c..104b199a90 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -28,8 +28,9 @@ void Dispatcher::StartClient(const char* server_name, unsigned int port) { static_cast(port), 1)); } -DispatcherBase::DispatcherBase(Storage& storage) +DispatcherBase::DispatcherBase(Storage& storage, Notifier& notifier) : m_storage(storage), + m_notifier(notifier), m_server(false), m_do_flush(false), m_reconnect_proto_rev(0x0300), @@ -135,11 +136,7 @@ std::vector DispatcherBase::GetConnections() const { std::lock_guard lock(m_user_mutex); for (auto& conn : m_connections) { if (conn.net->state() != NetworkConnection::kActive) continue; - auto& stream = conn.net->stream(); - conns.emplace_back( - ConnectionInfo{conn.net->remote_id(), stream.getPeerIP(), - static_cast(stream.getPeerPort()), - conn.net->last_update(), conn.net->proto_rev()}); + conns.emplace_back(conn.net->info()); } return conns; @@ -237,6 +234,7 @@ void DispatcherBase::ServerThreadMain() { using namespace std::placeholders; std::unique_ptr conn_unique(new NetworkConnection( std::move(stream), + m_notifier, std::bind(&Dispatcher::ServerHandshake, this, _1, _2, _3), std::bind(&Storage::GetEntryType, &m_storage, _1), std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2))); @@ -265,6 +263,7 @@ void DispatcherBase::ClientThreadMain( using namespace std::placeholders; std::unique_ptr conn_unique(new NetworkConnection( std::move(stream), + m_notifier, std::bind(&Dispatcher::ClientHandshake, this, _1, _2, _3), std::bind(&Storage::GetEntryType, &m_storage, _1), std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2))); diff --git a/src/Dispatcher.h b/src/Dispatcher.h index 0d4638cabe..f83cbfe25d 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -21,6 +21,7 @@ #include "atomic_static.h" #include "NetworkConnection.h" +#include "Notifier.h" #include "Storage.h" class NetworkAcceptor; @@ -47,7 +48,7 @@ class DispatcherBase { DispatcherBase& operator=(const DispatcherBase&) = delete; protected: - DispatcherBase(Storage& storage); + DispatcherBase(Storage& storage, Notifier& notifier); private: void DispatchThreadMain(); @@ -70,6 +71,7 @@ class DispatcherBase { NetworkConnection* except); Storage& m_storage; + Notifier& m_notifier; bool m_server; std::thread m_dispatch_thread; std::thread m_clientserver_thread; @@ -116,8 +118,9 @@ class Dispatcher : public DispatcherBase { void StartClient(const char* server_name, unsigned int port); private: - Dispatcher() : Dispatcher(Storage::GetInstance()) {} - Dispatcher(Storage& storage) : DispatcherBase(storage) {} + Dispatcher() : Dispatcher(Storage::GetInstance(), Notifier::GetInstance()) {} + Dispatcher(Storage& storage, Notifier& notifier) + : DispatcherBase(storage, notifier) {} ATOMIC_STATIC_DECL(Dispatcher) }; diff --git a/src/NetworkConnection.cpp b/src/NetworkConnection.cpp index 42f837f097..e77bc128ee 100644 --- a/src/NetworkConnection.cpp +++ b/src/NetworkConnection.cpp @@ -10,6 +10,7 @@ #include "support/timestamp.h" #include "tcpsockets/NetworkStream.h" #include "Log.h" +#include "Notifier.h" #include "raw_socket_istream.h" #include "WireDecoder.h" #include "WireEncoder.h" @@ -17,10 +18,12 @@ using namespace nt; NetworkConnection::NetworkConnection(std::unique_ptr stream, + Notifier& notifier, HandshakeFunc handshake, Message::GetEntryTypeFunc get_entry_type, ProcessIncomingFunc process_incoming) : m_stream(std::move(stream)), + m_notifier(notifier), m_handshake(handshake), m_get_entry_type(get_entry_type), m_process_incoming(process_incoming) { @@ -57,6 +60,12 @@ void NetworkConnection::Stop() { while (!m_outgoing.empty()) m_outgoing.pop(); } +ConnectionInfo NetworkConnection::info() const { + return ConnectionInfo{remote_id(), m_stream->getPeerIP(), + static_cast(m_stream->getPeerPort()), + m_last_update, m_proto_rev}; +} + std::string NetworkConnection::remote_id() const { std::lock_guard lock(m_remote_id_mutex); return m_remote_id; @@ -89,6 +98,7 @@ void NetworkConnection::ReadThreadMain() { } m_state = static_cast(kActive); + m_notifier.NotifyConnection(1, info()); while (m_active) { if (!m_stream) break; @@ -104,6 +114,7 @@ void NetworkConnection::ReadThreadMain() { m_process_incoming(std::move(msg), this); } DEBUG3("read thread died"); + if (m_state != kDead) m_notifier.NotifyConnection(0, info()); m_state = static_cast(kDead); m_active = false; m_outgoing.push(Outgoing()); // also kill write thread @@ -129,6 +140,7 @@ void NetworkConnection::WriteThreadMain() { DEBUG4("sent " << encoder.size() << " bytes"); } DEBUG3("write thread died"); + if (m_state != kDead) m_notifier.NotifyConnection(0, info()); m_state = static_cast(kDead); m_active = false; if (m_stream) m_stream->close(); // also kill read thread diff --git a/src/NetworkConnection.h b/src/NetworkConnection.h index a8ba281307..fa8b6ae0b7 100644 --- a/src/NetworkConnection.h +++ b/src/NetworkConnection.h @@ -14,11 +14,14 @@ #include "support/ConcurrentQueue.h" #include "Message.h" +#include "ntcore_cpp.h" class NetworkStream; namespace nt { +class Notifier; + class NetworkConnection { public: enum State { kCreated, kInit, kHandshake, kSynchronized, kActive, kDead }; @@ -34,6 +37,7 @@ class NetworkConnection { typedef ConcurrentQueue OutgoingQueue; NetworkConnection(std::unique_ptr stream, + Notifier& notifier, HandshakeFunc handshake, Message::GetEntryTypeFunc get_entry_type, ProcessIncomingFunc process_incoming); @@ -42,6 +46,8 @@ class NetworkConnection { void Start(); void Stop(); + ConnectionInfo info() const; + bool active() const { return m_active; } NetworkStream& stream() { return *m_stream; } OutgoingQueue& outgoing() { return m_outgoing; } @@ -65,6 +71,7 @@ class NetworkConnection { void WriteThreadMain(); std::unique_ptr m_stream; + Notifier& m_notifier; OutgoingQueue m_outgoing; HandshakeFunc m_handshake; Message::GetEntryTypeFunc m_get_entry_type; diff --git a/src/Notifier.cpp b/src/Notifier.cpp new file mode 100644 index 0000000000..cba61c6f8f --- /dev/null +++ b/src/Notifier.cpp @@ -0,0 +1,124 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "Notifier.h" + +using namespace nt; + +ATOMIC_STATIC_INIT(Notifier) + +Notifier::Notifier() { + m_active = false; +} + +Notifier::~Notifier() { Stop(); } + +void Notifier::Start() { + { + std::lock_guard lock(m_mutex); + if (m_active) return; + m_active = true; + } + m_thread = std::thread(&Notifier::ThreadMain, this); +} + +void Notifier::Stop() { + m_active = false; + // send notification so the thread terminates + NotifyEntry("", nullptr, false); + if (m_thread.joinable()) m_thread.join(); +} + +void Notifier::ThreadMain() { + std::unique_lock lock(m_mutex); + while (m_active) { + m_cond.wait(lock); + if (!m_active) continue; + + // Entry notifications + while (!m_entry_notifications.empty()) { + auto item = std::move(m_entry_notifications.front()); + m_entry_notifications.pop(); + + if (!item.value) continue; + StringRef name(item.name); + + // Use index because iterator might get invalidated. + for (std::size_t i=0; i lock(m_mutex); + unsigned int uid = m_entry_listeners.size(); + m_entry_listeners.emplace_back(prefix, callback); + return uid; +} + +void Notifier::RemoveEntryListener(unsigned int entry_listener_uid) { + std::lock_guard lock(m_mutex); + if (entry_listener_uid < m_entry_listeners.size()) + m_entry_listeners[entry_listener_uid].second = nullptr; +} + +void Notifier::NotifyEntry(StringRef name, std::shared_ptr value, + bool is_new) { + if (!m_active) return; + std::unique_lock lock(m_mutex); + m_entry_notifications.emplace(name, value, is_new); + lock.unlock(); + m_cond.notify_one(); +} + +unsigned int Notifier::AddConnectionListener( + ConnectionListenerCallback callback) { + std::lock_guard lock(m_mutex); + unsigned int uid = m_entry_listeners.size(); + m_conn_listeners.emplace_back(callback); + return uid; +} + +void Notifier::RemoveConnectionListener(unsigned int conn_listener_uid) { + std::lock_guard lock(m_mutex); + if (conn_listener_uid < m_conn_listeners.size()) + m_conn_listeners[conn_listener_uid] = nullptr; +} + +void Notifier::NotifyConnection(int connected, + const ConnectionInfo& conn_info) { + if (!m_active) return; + std::unique_lock lock(m_mutex); + m_conn_notifications.emplace(connected, conn_info); + lock.unlock(); + m_cond.notify_one(); +} diff --git a/src/Notifier.h b/src/Notifier.h new file mode 100644 index 0000000000..dfbc0bda95 --- /dev/null +++ b/src/Notifier.h @@ -0,0 +1,88 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#ifndef NT_NOTIFIER_H_ +#define NT_NOTIFIER_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include "atomic_static.h" +#include "ntcore_cpp.h" + +namespace nt { + +class Notifier { + friend class NotifierTest; + public: + static Notifier& GetInstance() { + ATOMIC_STATIC(Notifier, instance); + return instance; + } + ~Notifier(); + + void Start(); + void Stop(); + + bool active() const { return m_active; } + + unsigned int AddEntryListener(StringRef prefix, + EntryListenerCallback callback); + void RemoveEntryListener(unsigned int entry_listener_uid); + + void NotifyEntry(StringRef name, std::shared_ptr value, bool is_new); + + unsigned int AddConnectionListener(ConnectionListenerCallback callback); + void RemoveConnectionListener(unsigned int conn_listener_uid); + + void NotifyConnection(int connected, const ConnectionInfo& conn_info); + + private: + Notifier(); + + void ThreadMain(); + + std::atomic_bool m_active; + + std::mutex m_mutex; + std::condition_variable m_cond; + std::vector> m_entry_listeners; + std::vector m_conn_listeners; + + struct EntryNotification { + EntryNotification(StringRef name_, std::shared_ptr value_, + bool is_new_) + : name(name_), value(value_), is_new(is_new_) {} + + std::string name; + std::shared_ptr value; + bool is_new; + }; + std::queue m_entry_notifications; + + struct ConnectionNotification { + ConnectionNotification(int connected_, const ConnectionInfo& conn_info_) + : connected(connected_), conn_info(conn_info_) {} + + int connected; + ConnectionInfo conn_info; + }; + std::queue m_conn_notifications; + + std::thread m_thread; + + ATOMIC_STATIC_DECL(Notifier) +}; + +} // namespace nt + +#endif // NT_NOTIFIER_H_ diff --git a/src/Storage.cpp b/src/Storage.cpp index 96969c366c..f9490a6cc9 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -19,7 +19,7 @@ using namespace nt; ATOMIC_STATIC_INIT(Storage) -Storage::Storage() {} +Storage::Storage(Notifier& notifier) : m_notifier(notifier) {} Storage::~Storage() {} @@ -77,6 +77,9 @@ void Storage::ProcessIncoming(std::shared_ptr msg, entry->id = id; m_idmap.push_back(entry); + // notify + m_notifier.NotifyEntry(name, entry->value, true); + // send the assignment to everyone (including the originator) if (m_queue_outgoing) { auto queue_outgoing = m_queue_outgoing; @@ -113,12 +116,17 @@ void Storage::ProcessIncoming(std::shared_ptr msg, new_entry.reset(new Entry(name)); new_entry->value = msg->value(); new_entry->flags = msg->flags(); - } else - may_need_update = true; // we may need to send an update message + new_entry->id = id; + m_idmap[id] = new_entry.get(); + + // notify + m_notifier.NotifyEntry(name, new_entry->value, true); + return; + } + may_need_update = true; // we may need to send an update message entry = new_entry.get(); entry->id = id; m_idmap[id] = entry; - return; } } @@ -151,6 +159,9 @@ void Storage::ProcessIncoming(std::shared_ptr msg, // don't update flags from a <3.0 remote (not part of message) if (conn->proto_rev() >= 0x0300) entry->flags = msg->flags(); + // notify + m_notifier.NotifyEntry(name, entry->value, false); + // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) if (m_server && m_queue_outgoing) { @@ -182,6 +193,9 @@ void Storage::ProcessIncoming(std::shared_ptr msg, entry->value = msg->value(); entry->seq_num = seq_num; + // notify + m_notifier.NotifyEntry(entry->name, entry->value, false); + // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) if (m_server && m_queue_outgoing) { @@ -511,6 +525,14 @@ std::vector Storage::GetEntryInfo(StringRef prefix, return infos; } +void Storage::NotifyEntries(StringRef prefix) { + std::lock_guard lock(m_mutex); + for (auto& i : m_entries) { + if (!i.getKey().startswith(prefix)) continue; + m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false); + } +} + /* Escapes and writes a string, including start and end double quotes */ static void WriteString(std::ostream& os, llvm::StringRef str) { os << '"'; diff --git a/src/Storage.h b/src/Storage.h index f91d89b3e5..81a741ece3 100644 --- a/src/Storage.h +++ b/src/Storage.h @@ -19,6 +19,7 @@ #include "llvm/StringMap.h" #include "atomic_static.h" #include "Message.h" +#include "Notifier.h" #include "ntcore_cpp.h" #include "SequenceNumber.h" @@ -64,6 +65,7 @@ class Storage { void DeleteEntry(StringRef name); void DeleteAllEntries(); std::vector GetEntryInfo(StringRef prefix, unsigned int types); + void NotifyEntries(StringRef prefix); void SavePersistent(std::ostream& os) const; bool LoadPersistent( @@ -71,7 +73,8 @@ class Storage { std::function warn); private: - Storage(); + Storage() : Storage(Notifier::GetInstance()) {} + Storage(Notifier& notifier); Storage(const Storage&) = delete; Storage& operator=(const Storage&) = delete; @@ -95,6 +98,7 @@ class Storage { QueueOutgoingFunc m_queue_outgoing; bool m_server; + Notifier& m_notifier; ATOMIC_STATIC_DECL(Storage) }; diff --git a/src/ntcore_cpp.cpp b/src/ntcore_cpp.cpp index 1d9ca55023..731d74f807 100644 --- a/src/ntcore_cpp.cpp +++ b/src/ntcore_cpp.cpp @@ -14,6 +14,7 @@ #include "Dispatcher.h" #include "Log.h" +#include "Notifier.h" #include "Storage.h" namespace nt { @@ -64,16 +65,27 @@ void Flush() { unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback, bool immediate_notify) { - return 0; + Notifier& notifier = Notifier::GetInstance(); + unsigned int uid = notifier.AddEntryListener(prefix, callback); + notifier.Start(); + if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix); + return uid; } -void RemoveEntryListener(unsigned int entry_listener_uid) {} +void RemoveEntryListener(unsigned int entry_listener_uid) { + Notifier::GetInstance().RemoveEntryListener(entry_listener_uid); +} unsigned int AddConnectionListener(ConnectionListenerCallback callback) { - return 0; + Notifier& notifier = Notifier::GetInstance(); + unsigned int uid = notifier.AddConnectionListener(callback); + Notifier::GetInstance().Start(); + return uid; } -void RemoveConnectionListener(unsigned int conn_listener_uid) {} +void RemoveConnectionListener(unsigned int conn_listener_uid) { + Notifier::GetInstance().RemoveConnectionListener(conn_listener_uid); +} /* * Remote Procedure Call Functions