/*----------------------------------------------------------------------------*/ /* Copyright (c) FIRST 2015. All Rights Reserved. */ /* Open Source Software - may be modified and shared by FRC teams. The code */ /* must be accompanied by the FIRST BSD license file in the root directory of */ /* the project. */ /*----------------------------------------------------------------------------*/ #include "Notifier.h" #include #include using namespace nt; ATOMIC_STATIC_INIT(Notifier) bool Notifier::s_destroyed = false; namespace { // Vector which provides an integrated freelist for removal and reuse of // individual elements. template class UidVector { public: typedef typename std::vector::size_type size_type; size_type size() const { return m_vector.size(); } T& operator[](size_type i) { return m_vector[i]; } const T& operator[](size_type i) const { return m_vector[i]; } // Add a new T to the vector. If there are elements on the freelist, // reuses the last one; otherwise adds to the end of the vector. // Returns the resulting element index (+1). template unsigned int emplace_back(Args&&... args) { unsigned int uid; if (m_free.empty()) { uid = m_vector.size(); m_vector.emplace_back(std::forward(args)...); } else { uid = m_free.back(); m_free.pop_back(); m_vector[uid] = T(std::forward(args)...); } return uid + 1; } // Removes the identified element by replacing it with a default-constructed // one. The element is added to the freelist for later reuse. void erase(unsigned int uid) { --uid; if (uid >= m_vector.size() || !m_vector[uid]) return; m_free.push_back(uid); m_vector[uid] = T(); } private: std::vector m_vector; std::vector m_free; }; } // anonymous namespace class Notifier::Thread : public wpi::SafeThread { public: Thread(std::function on_start, std::function on_exit) : m_on_start(on_start), m_on_exit(on_exit) {} void Main(); struct EntryListener { EntryListener() = default; EntryListener(StringRef prefix_, EntryListenerCallback callback_, unsigned int flags_) : prefix(prefix_), callback(callback_), flags(flags_) {} explicit operator bool() const { return bool(callback); } std::string prefix; EntryListenerCallback callback; unsigned int flags; }; UidVector m_entry_listeners; UidVector m_conn_listeners; struct EntryNotification { EntryNotification(llvm::StringRef name_, std::shared_ptr value_, unsigned int flags_, EntryListenerCallback only_) : name(name_), value(value_), flags(flags_), only(only_) {} std::string name; std::shared_ptr value; unsigned int flags; EntryListenerCallback only; }; std::queue m_entry_notifications; struct ConnectionNotification { ConnectionNotification(bool connected_, const ConnectionInfo& conn_info_, ConnectionListenerCallback only_) : connected(connected_), conn_info(conn_info_), only(only_) {} bool connected; ConnectionInfo conn_info; ConnectionListenerCallback only; }; std::queue m_conn_notifications; std::function m_on_start; std::function m_on_exit; }; Notifier::Notifier() { m_local_notifiers = false; s_destroyed = false; } Notifier::~Notifier() { s_destroyed = true; } void Notifier::Start() { auto thr = m_owner.GetThread(); if (!thr) m_owner.Start(new Thread(m_on_start, m_on_exit)); } void Notifier::Stop() { m_owner.Stop(); } void Notifier::Thread::Main() { if (m_on_start) m_on_start(); std::unique_lock lock(m_mutex); while (m_active) { while (m_entry_notifications.empty() && m_conn_notifications.empty()) { m_cond.wait(lock); if (!m_active) goto done; } // Entry notifications while (!m_entry_notifications.empty()) { if (!m_active) goto done; auto item = std::move(m_entry_notifications.front()); m_entry_notifications.pop(); if (!item.value) continue; StringRef name(item.name); if (item.only) { // Don't hold mutex during callback execution! lock.unlock(); item.only(0, name, item.value, item.flags); lock.lock(); continue; } // Use index because iterator might get invalidated. for (std::size_t i=0; im_entry_listeners.emplace_back(prefix, callback, flags); } void Notifier::RemoveEntryListener(unsigned int entry_listener_uid) { auto thr = m_owner.GetThread(); if (!thr) return; thr->m_entry_listeners.erase(entry_listener_uid); } void Notifier::NotifyEntry(StringRef name, std::shared_ptr value, unsigned int flags, EntryListenerCallback only) { // optimization: don't generate needless local queue entries if we have // no local listeners (as this is a common case on the server side) if ((flags & NT_NOTIFY_LOCAL) != 0 && !m_local_notifiers) return; auto thr = m_owner.GetThread(); if (!thr) return; thr->m_entry_notifications.emplace(name, value, flags, only); thr->m_cond.notify_one(); } unsigned int Notifier::AddConnectionListener( ConnectionListenerCallback callback) { Start(); auto thr = m_owner.GetThread(); return thr->m_conn_listeners.emplace_back(callback); } void Notifier::RemoveConnectionListener(unsigned int conn_listener_uid) { auto thr = m_owner.GetThread(); if (!thr) return; thr->m_conn_listeners.erase(conn_listener_uid); } void Notifier::NotifyConnection(bool connected, const ConnectionInfo& conn_info, ConnectionListenerCallback only) { auto thr = m_owner.GetThread(); if (!thr) return; thr->m_conn_notifications.emplace(connected, conn_info, only); thr->m_cond.notify_one(); }