Implement notifiers.

The notifier thread is lazily started when the first notifier is added.
This avoids the extra thread/processing overhead when notifiers are not used.
This commit is contained in:
Peter Johnson
2015-08-02 21:47:01 -07:00
parent 538a19fd47
commit e9073a3cc0
9 changed files with 289 additions and 18 deletions

View File

@@ -28,8 +28,9 @@ void Dispatcher::StartClient(const char* server_name, unsigned int port) {
static_cast<int>(port), 1));
}
DispatcherBase::DispatcherBase(Storage& storage)
DispatcherBase::DispatcherBase(Storage& storage, Notifier& notifier)
: m_storage(storage),
m_notifier(notifier),
m_server(false),
m_do_flush(false),
m_reconnect_proto_rev(0x0300),
@@ -135,11 +136,7 @@ std::vector<ConnectionInfo> DispatcherBase::GetConnections() const {
std::lock_guard<std::mutex> lock(m_user_mutex);
for (auto& conn : m_connections) {
if (conn.net->state() != NetworkConnection::kActive) continue;
auto& stream = conn.net->stream();
conns.emplace_back(
ConnectionInfo{conn.net->remote_id(), stream.getPeerIP(),
static_cast<unsigned int>(stream.getPeerPort()),
conn.net->last_update(), conn.net->proto_rev()});
conns.emplace_back(conn.net->info());
}
return conns;
@@ -237,6 +234,7 @@ void DispatcherBase::ServerThreadMain() {
using namespace std::placeholders;
std::unique_ptr<NetworkConnection> conn_unique(new NetworkConnection(
std::move(stream),
m_notifier,
std::bind(&Dispatcher::ServerHandshake, this, _1, _2, _3),
std::bind(&Storage::GetEntryType, &m_storage, _1),
std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2)));
@@ -265,6 +263,7 @@ void DispatcherBase::ClientThreadMain(
using namespace std::placeholders;
std::unique_ptr<NetworkConnection> conn_unique(new NetworkConnection(
std::move(stream),
m_notifier,
std::bind(&Dispatcher::ClientHandshake, this, _1, _2, _3),
std::bind(&Storage::GetEntryType, &m_storage, _1),
std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2)));

View File

@@ -21,6 +21,7 @@
#include "atomic_static.h"
#include "NetworkConnection.h"
#include "Notifier.h"
#include "Storage.h"
class NetworkAcceptor;
@@ -47,7 +48,7 @@ class DispatcherBase {
DispatcherBase& operator=(const DispatcherBase&) = delete;
protected:
DispatcherBase(Storage& storage);
DispatcherBase(Storage& storage, Notifier& notifier);
private:
void DispatchThreadMain();
@@ -70,6 +71,7 @@ class DispatcherBase {
NetworkConnection* except);
Storage& m_storage;
Notifier& m_notifier;
bool m_server;
std::thread m_dispatch_thread;
std::thread m_clientserver_thread;
@@ -116,8 +118,9 @@ class Dispatcher : public DispatcherBase {
void StartClient(const char* server_name, unsigned int port);
private:
Dispatcher() : Dispatcher(Storage::GetInstance()) {}
Dispatcher(Storage& storage) : DispatcherBase(storage) {}
Dispatcher() : Dispatcher(Storage::GetInstance(), Notifier::GetInstance()) {}
Dispatcher(Storage& storage, Notifier& notifier)
: DispatcherBase(storage, notifier) {}
ATOMIC_STATIC_DECL(Dispatcher)
};

View File

@@ -10,6 +10,7 @@
#include "support/timestamp.h"
#include "tcpsockets/NetworkStream.h"
#include "Log.h"
#include "Notifier.h"
#include "raw_socket_istream.h"
#include "WireDecoder.h"
#include "WireEncoder.h"
@@ -17,10 +18,12 @@
using namespace nt;
NetworkConnection::NetworkConnection(std::unique_ptr<NetworkStream> stream,
Notifier& notifier,
HandshakeFunc handshake,
Message::GetEntryTypeFunc get_entry_type,
ProcessIncomingFunc process_incoming)
: m_stream(std::move(stream)),
m_notifier(notifier),
m_handshake(handshake),
m_get_entry_type(get_entry_type),
m_process_incoming(process_incoming) {
@@ -57,6 +60,12 @@ void NetworkConnection::Stop() {
while (!m_outgoing.empty()) m_outgoing.pop();
}
ConnectionInfo NetworkConnection::info() const {
return ConnectionInfo{remote_id(), m_stream->getPeerIP(),
static_cast<unsigned int>(m_stream->getPeerPort()),
m_last_update, m_proto_rev};
}
std::string NetworkConnection::remote_id() const {
std::lock_guard<std::mutex> lock(m_remote_id_mutex);
return m_remote_id;
@@ -89,6 +98,7 @@ void NetworkConnection::ReadThreadMain() {
}
m_state = static_cast<int>(kActive);
m_notifier.NotifyConnection(1, info());
while (m_active) {
if (!m_stream)
break;
@@ -104,6 +114,7 @@ void NetworkConnection::ReadThreadMain() {
m_process_incoming(std::move(msg), this);
}
DEBUG3("read thread died");
if (m_state != kDead) m_notifier.NotifyConnection(0, info());
m_state = static_cast<int>(kDead);
m_active = false;
m_outgoing.push(Outgoing()); // also kill write thread
@@ -129,6 +140,7 @@ void NetworkConnection::WriteThreadMain() {
DEBUG4("sent " << encoder.size() << " bytes");
}
DEBUG3("write thread died");
if (m_state != kDead) m_notifier.NotifyConnection(0, info());
m_state = static_cast<int>(kDead);
m_active = false;
if (m_stream) m_stream->close(); // also kill read thread

View File

@@ -14,11 +14,14 @@
#include "support/ConcurrentQueue.h"
#include "Message.h"
#include "ntcore_cpp.h"
class NetworkStream;
namespace nt {
class Notifier;
class NetworkConnection {
public:
enum State { kCreated, kInit, kHandshake, kSynchronized, kActive, kDead };
@@ -34,6 +37,7 @@ class NetworkConnection {
typedef ConcurrentQueue<Outgoing> OutgoingQueue;
NetworkConnection(std::unique_ptr<NetworkStream> stream,
Notifier& notifier,
HandshakeFunc handshake,
Message::GetEntryTypeFunc get_entry_type,
ProcessIncomingFunc process_incoming);
@@ -42,6 +46,8 @@ class NetworkConnection {
void Start();
void Stop();
ConnectionInfo info() const;
bool active() const { return m_active; }
NetworkStream& stream() { return *m_stream; }
OutgoingQueue& outgoing() { return m_outgoing; }
@@ -65,6 +71,7 @@ class NetworkConnection {
void WriteThreadMain();
std::unique_ptr<NetworkStream> m_stream;
Notifier& m_notifier;
OutgoingQueue m_outgoing;
HandshakeFunc m_handshake;
Message::GetEntryTypeFunc m_get_entry_type;

124
src/Notifier.cpp Normal file
View File

@@ -0,0 +1,124 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#include "Notifier.h"
using namespace nt;
ATOMIC_STATIC_INIT(Notifier)
Notifier::Notifier() {
m_active = false;
}
Notifier::~Notifier() { Stop(); }
void Notifier::Start() {
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_active) return;
m_active = true;
}
m_thread = std::thread(&Notifier::ThreadMain, this);
}
void Notifier::Stop() {
m_active = false;
// send notification so the thread terminates
NotifyEntry("", nullptr, false);
if (m_thread.joinable()) m_thread.join();
}
void Notifier::ThreadMain() {
std::unique_lock<std::mutex> lock(m_mutex);
while (m_active) {
m_cond.wait(lock);
if (!m_active) continue;
// Entry notifications
while (!m_entry_notifications.empty()) {
auto item = std::move(m_entry_notifications.front());
m_entry_notifications.pop();
if (!item.value) continue;
StringRef name(item.name);
// Use index because iterator might get invalidated.
for (std::size_t i=0; i<m_entry_listeners.size(); ++i) {
if (!m_entry_listeners[i].second) continue; // removed
if (!name.startswith(m_entry_listeners[i].first)) continue;
auto callback = m_entry_listeners[i].second;
// Don't hold mutex during callback execution!
lock.unlock();
callback(i, name, item.value, item.is_new);
lock.lock();
}
}
// Connection notifications
while (!m_conn_notifications.empty()) {
auto item = std::move(m_conn_notifications.front());
m_conn_notifications.pop();
// Use index because iterator might get invalidated.
for (std::size_t i=0; i<m_conn_listeners.size(); ++i) {
if (!m_conn_listeners[i]) continue; // removed
auto callback = m_conn_listeners[i];
// Don't hold mutex during callback execution!
lock.unlock();
callback(i, item.connected, item.conn_info);
lock.lock();
}
}
}
}
unsigned int Notifier::AddEntryListener(StringRef prefix,
EntryListenerCallback callback) {
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_entry_listeners.size();
m_entry_listeners.emplace_back(prefix, callback);
return uid;
}
void Notifier::RemoveEntryListener(unsigned int entry_listener_uid) {
std::lock_guard<std::mutex> lock(m_mutex);
if (entry_listener_uid < m_entry_listeners.size())
m_entry_listeners[entry_listener_uid].second = nullptr;
}
void Notifier::NotifyEntry(StringRef name, std::shared_ptr<Value> value,
bool is_new) {
if (!m_active) return;
std::unique_lock<std::mutex> lock(m_mutex);
m_entry_notifications.emplace(name, value, is_new);
lock.unlock();
m_cond.notify_one();
}
unsigned int Notifier::AddConnectionListener(
ConnectionListenerCallback callback) {
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_entry_listeners.size();
m_conn_listeners.emplace_back(callback);
return uid;
}
void Notifier::RemoveConnectionListener(unsigned int conn_listener_uid) {
std::lock_guard<std::mutex> lock(m_mutex);
if (conn_listener_uid < m_conn_listeners.size())
m_conn_listeners[conn_listener_uid] = nullptr;
}
void Notifier::NotifyConnection(int connected,
const ConnectionInfo& conn_info) {
if (!m_active) return;
std::unique_lock<std::mutex> lock(m_mutex);
m_conn_notifications.emplace(connected, conn_info);
lock.unlock();
m_cond.notify_one();
}

88
src/Notifier.h Normal file
View File

@@ -0,0 +1,88 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#ifndef NT_NOTIFIER_H_
#define NT_NOTIFIER_H_
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
#include "atomic_static.h"
#include "ntcore_cpp.h"
namespace nt {
class Notifier {
friend class NotifierTest;
public:
static Notifier& GetInstance() {
ATOMIC_STATIC(Notifier, instance);
return instance;
}
~Notifier();
void Start();
void Stop();
bool active() const { return m_active; }
unsigned int AddEntryListener(StringRef prefix,
EntryListenerCallback callback);
void RemoveEntryListener(unsigned int entry_listener_uid);
void NotifyEntry(StringRef name, std::shared_ptr<Value> value, bool is_new);
unsigned int AddConnectionListener(ConnectionListenerCallback callback);
void RemoveConnectionListener(unsigned int conn_listener_uid);
void NotifyConnection(int connected, const ConnectionInfo& conn_info);
private:
Notifier();
void ThreadMain();
std::atomic_bool m_active;
std::mutex m_mutex;
std::condition_variable m_cond;
std::vector<std::pair<std::string, EntryListenerCallback>> m_entry_listeners;
std::vector<ConnectionListenerCallback> m_conn_listeners;
struct EntryNotification {
EntryNotification(StringRef name_, std::shared_ptr<Value> value_,
bool is_new_)
: name(name_), value(value_), is_new(is_new_) {}
std::string name;
std::shared_ptr<Value> value;
bool is_new;
};
std::queue<EntryNotification> m_entry_notifications;
struct ConnectionNotification {
ConnectionNotification(int connected_, const ConnectionInfo& conn_info_)
: connected(connected_), conn_info(conn_info_) {}
int connected;
ConnectionInfo conn_info;
};
std::queue<ConnectionNotification> m_conn_notifications;
std::thread m_thread;
ATOMIC_STATIC_DECL(Notifier)
};
} // namespace nt
#endif // NT_NOTIFIER_H_

View File

@@ -19,7 +19,7 @@ using namespace nt;
ATOMIC_STATIC_INIT(Storage)
Storage::Storage() {}
Storage::Storage(Notifier& notifier) : m_notifier(notifier) {}
Storage::~Storage() {}
@@ -77,6 +77,9 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
entry->id = id;
m_idmap.push_back(entry);
// notify
m_notifier.NotifyEntry(name, entry->value, true);
// send the assignment to everyone (including the originator)
if (m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
@@ -113,12 +116,17 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
new_entry.reset(new Entry(name));
new_entry->value = msg->value();
new_entry->flags = msg->flags();
} else
may_need_update = true; // we may need to send an update message
new_entry->id = id;
m_idmap[id] = new_entry.get();
// notify
m_notifier.NotifyEntry(name, new_entry->value, true);
return;
}
may_need_update = true; // we may need to send an update message
entry = new_entry.get();
entry->id = id;
m_idmap[id] = entry;
return;
}
}
@@ -151,6 +159,9 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// don't update flags from a <3.0 remote (not part of message)
if (conn->proto_rev() >= 0x0300) entry->flags = msg->flags();
// notify
m_notifier.NotifyEntry(name, entry->value, false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_queue_outgoing) {
@@ -182,6 +193,9 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
entry->value = msg->value();
entry->seq_num = seq_num;
// notify
m_notifier.NotifyEntry(entry->name, entry->value, false);
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_queue_outgoing) {
@@ -511,6 +525,14 @@ std::vector<EntryInfo> Storage::GetEntryInfo(StringRef prefix,
return infos;
}
void Storage::NotifyEntries(StringRef prefix) {
std::lock_guard<std::mutex> lock(m_mutex);
for (auto& i : m_entries) {
if (!i.getKey().startswith(prefix)) continue;
m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false);
}
}
/* Escapes and writes a string, including start and end double quotes */
static void WriteString(std::ostream& os, llvm::StringRef str) {
os << '"';

View File

@@ -19,6 +19,7 @@
#include "llvm/StringMap.h"
#include "atomic_static.h"
#include "Message.h"
#include "Notifier.h"
#include "ntcore_cpp.h"
#include "SequenceNumber.h"
@@ -64,6 +65,7 @@ class Storage {
void DeleteEntry(StringRef name);
void DeleteAllEntries();
std::vector<EntryInfo> GetEntryInfo(StringRef prefix, unsigned int types);
void NotifyEntries(StringRef prefix);
void SavePersistent(std::ostream& os) const;
bool LoadPersistent(
@@ -71,7 +73,8 @@ class Storage {
std::function<void(std::size_t line, const char* msg)> warn);
private:
Storage();
Storage() : Storage(Notifier::GetInstance()) {}
Storage(Notifier& notifier);
Storage(const Storage&) = delete;
Storage& operator=(const Storage&) = delete;
@@ -95,6 +98,7 @@ class Storage {
QueueOutgoingFunc m_queue_outgoing;
bool m_server;
Notifier& m_notifier;
ATOMIC_STATIC_DECL(Storage)
};

View File

@@ -14,6 +14,7 @@
#include "Dispatcher.h"
#include "Log.h"
#include "Notifier.h"
#include "Storage.h"
namespace nt {
@@ -64,16 +65,27 @@ void Flush() {
unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback,
bool immediate_notify) {
return 0;
Notifier& notifier = Notifier::GetInstance();
unsigned int uid = notifier.AddEntryListener(prefix, callback);
notifier.Start();
if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix);
return uid;
}
void RemoveEntryListener(unsigned int entry_listener_uid) {}
void RemoveEntryListener(unsigned int entry_listener_uid) {
Notifier::GetInstance().RemoveEntryListener(entry_listener_uid);
}
unsigned int AddConnectionListener(ConnectionListenerCallback callback) {
return 0;
Notifier& notifier = Notifier::GetInstance();
unsigned int uid = notifier.AddConnectionListener(callback);
Notifier::GetInstance().Start();
return uid;
}
void RemoveConnectionListener(unsigned int conn_listener_uid) {}
void RemoveConnectionListener(unsigned int conn_listener_uid) {
Notifier::GetInstance().RemoveConnectionListener(conn_listener_uid);
}
/*
* Remote Procedure Call Functions