Support immediate notify of connection listener.

Also only perform immediate notification to the callback actually
requesting the notification, not all existing callbacks.

Offset returned uids by 1 so uid=0 can be used to indicate immediate
notification.
This commit is contained in:
Peter Johnson
2015-08-28 00:13:56 -07:00
parent 302cc064c6
commit b488cdd6ff
10 changed files with 65 additions and 23 deletions

View File

@@ -269,7 +269,8 @@ unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len,
int immediate_notify);
void NT_RemoveEntryListener(unsigned int entry_listener_uid);
unsigned int NT_AddConnectionListener(void *data,
NT_ConnectionListenerCallback callback);
NT_ConnectionListenerCallback callback,
int immediate_notify);
void NT_RemoveConnectionListener(unsigned int conn_listener_uid);
/*

View File

@@ -191,7 +191,8 @@ typedef std::function<void(unsigned int uid, bool connected,
unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback,
bool immediate_notify);
void RemoveEntryListener(unsigned int entry_listener_uid);
unsigned int AddConnectionListener(ConnectionListenerCallback callback);
unsigned int AddConnectionListener(ConnectionListenerCallback callback,
bool immediate_notify);
void RemoveConnectionListener(unsigned int conn_listener_uid);
/*

View File

@@ -158,6 +158,15 @@ std::vector<ConnectionInfo> DispatcherBase::GetConnections() const {
return conns;
}
void DispatcherBase::NotifyConnections(
ConnectionListenerCallback callback) const {
std::lock_guard<std::mutex> lock(m_user_mutex);
for (auto& conn : m_connections) {
if (conn->state() != NetworkConnection::kActive) continue;
m_notifier.NotifyConnection(true, conn->info(), callback);
}
}
void DispatcherBase::DispatchThreadMain() {
auto timeout_time = std::chrono::steady_clock::now();

View File

@@ -42,6 +42,7 @@ class DispatcherBase {
void SetIdentity(llvm::StringRef name);
void Flush();
std::vector<ConnectionInfo> GetConnections() const;
void NotifyConnections(ConnectionListenerCallback callback) const;
bool active() const { return m_active; }

View File

@@ -49,6 +49,14 @@ void Notifier::ThreadMain() {
if (!item.value) continue;
StringRef name(item.name);
if (item.only) {
// Don't hold mutex during callback execution!
lock.unlock();
item.only(0, name, item.value, item.is_new);
lock.lock();
continue;
}
// Use index because iterator might get invalidated.
for (std::size_t i=0; i<m_entry_listeners.size(); ++i) {
if (!m_entry_listeners[i].second) continue; // removed
@@ -56,7 +64,7 @@ void Notifier::ThreadMain() {
auto callback = m_entry_listeners[i].second;
// Don't hold mutex during callback execution!
lock.unlock();
callback(i, name, item.value, item.is_new);
callback(i+1, name, item.value, item.is_new);
lock.lock();
}
}
@@ -66,13 +74,21 @@ void Notifier::ThreadMain() {
auto item = std::move(m_conn_notifications.front());
m_conn_notifications.pop();
if (item.only) {
// Don't hold mutex during callback execution!
lock.unlock();
item.only(0, item.connected, item.conn_info);
lock.lock();
continue;
}
// Use index because iterator might get invalidated.
for (std::size_t i=0; i<m_conn_listeners.size(); ++i) {
if (!m_conn_listeners[i]) continue; // removed
auto callback = m_conn_listeners[i];
// Don't hold mutex during callback execution!
lock.unlock();
callback(i, item.connected, item.conn_info);
callback(i+1, item.connected, item.conn_info);
lock.lock();
}
}
@@ -84,20 +100,21 @@ unsigned int Notifier::AddEntryListener(StringRef prefix,
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_entry_listeners.size();
m_entry_listeners.emplace_back(prefix, callback);
return uid;
return uid + 1;
}
void Notifier::RemoveEntryListener(unsigned int entry_listener_uid) {
--entry_listener_uid;
std::lock_guard<std::mutex> lock(m_mutex);
if (entry_listener_uid < m_entry_listeners.size())
m_entry_listeners[entry_listener_uid].second = nullptr;
}
void Notifier::NotifyEntry(StringRef name, std::shared_ptr<Value> value,
bool is_new) {
bool is_new, EntryListenerCallback only) {
if (!m_active) return;
std::unique_lock<std::mutex> lock(m_mutex);
m_entry_notifications.emplace(name, value, is_new);
m_entry_notifications.emplace(name, value, is_new, only);
lock.unlock();
m_cond.notify_one();
}
@@ -107,20 +124,22 @@ unsigned int Notifier::AddConnectionListener(
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_entry_listeners.size();
m_conn_listeners.emplace_back(callback);
return uid;
return uid + 1;
}
void Notifier::RemoveConnectionListener(unsigned int conn_listener_uid) {
--conn_listener_uid;
std::lock_guard<std::mutex> lock(m_mutex);
if (conn_listener_uid < m_conn_listeners.size())
m_conn_listeners[conn_listener_uid] = nullptr;
}
void Notifier::NotifyConnection(bool connected,
const ConnectionInfo& conn_info) {
const ConnectionInfo& conn_info,
ConnectionListenerCallback only) {
if (!m_active) return;
std::unique_lock<std::mutex> lock(m_mutex);
m_conn_notifications.emplace(connected, conn_info);
m_conn_notifications.emplace(connected, conn_info, only);
lock.unlock();
m_cond.notify_one();
}

View File

@@ -39,12 +39,14 @@ class Notifier {
EntryListenerCallback callback);
void RemoveEntryListener(unsigned int entry_listener_uid);
void NotifyEntry(StringRef name, std::shared_ptr<Value> value, bool is_new);
void NotifyEntry(StringRef name, std::shared_ptr<Value> value, bool is_new,
EntryListenerCallback only = nullptr);
unsigned int AddConnectionListener(ConnectionListenerCallback callback);
void RemoveConnectionListener(unsigned int conn_listener_uid);
void NotifyConnection(bool connected, const ConnectionInfo& conn_info);
void NotifyConnection(bool connected, const ConnectionInfo& conn_info,
ConnectionListenerCallback only = nullptr);
private:
Notifier();
@@ -60,21 +62,24 @@ class Notifier {
struct EntryNotification {
EntryNotification(StringRef name_, std::shared_ptr<Value> value_,
bool is_new_)
: name(name_), value(value_), is_new(is_new_) {}
bool is_new_, EntryListenerCallback only_)
: name(name_), value(value_), is_new(is_new_), only(only_) {}
std::string name;
std::shared_ptr<Value> value;
bool is_new;
EntryListenerCallback only;
};
std::queue<EntryNotification> m_entry_notifications;
struct ConnectionNotification {
ConnectionNotification(bool connected_, const ConnectionInfo& conn_info_)
: connected(connected_), conn_info(conn_info_) {}
ConnectionNotification(bool connected_, const ConnectionInfo& conn_info_,
ConnectionListenerCallback only_)
: connected(connected_), conn_info(conn_info_), only(only_) {}
bool connected;
ConnectionInfo conn_info;
ConnectionListenerCallback only;
};
std::queue<ConnectionNotification> m_conn_notifications;

View File

@@ -606,11 +606,12 @@ std::vector<EntryInfo> Storage::GetEntryInfo(StringRef prefix,
return infos;
}
void Storage::NotifyEntries(StringRef prefix) {
void Storage::NotifyEntries(StringRef prefix,
EntryListenerCallback only) const {
std::lock_guard<std::mutex> lock(m_mutex);
for (auto& i : m_entries) {
if (!i.getKey().startswith(prefix)) continue;
m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false);
m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, false, only);
}
}

View File

@@ -74,7 +74,8 @@ class Storage {
void DeleteEntry(StringRef name);
void DeleteAllEntries();
std::vector<EntryInfo> GetEntryInfo(StringRef prefix, unsigned int types);
void NotifyEntries(StringRef prefix);
void NotifyEntries(StringRef prefix,
EntryListenerCallback only = nullptr) const;
// Filename-based save/load functions. Used both by periodic saves and
// accessible directly via the user API.

View File

@@ -180,14 +180,16 @@ void NT_RemoveEntryListener(unsigned int entry_listener_uid) {
}
unsigned int NT_AddConnectionListener(void *data,
NT_ConnectionListenerCallback callback) {
NT_ConnectionListenerCallback callback,
int immediate_notify) {
return nt::AddConnectionListener(
[=](unsigned int uid, bool connected, const ConnectionInfo &conn) {
NT_ConnectionInfo conn_c;
ConvertToC(conn, &conn_c);
callback(uid, data, connected ? 1 : 0, &conn_c);
DisposeConnectionInfo(&conn_c);
});
},
immediate_notify != 0);
}
void NT_RemoveConnectionListener(unsigned int conn_listener_uid) {

View File

@@ -70,7 +70,7 @@ unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback,
Notifier& notifier = Notifier::GetInstance();
unsigned int uid = notifier.AddEntryListener(prefix, callback);
notifier.Start();
if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix);
if (immediate_notify) Storage::GetInstance().NotifyEntries(prefix, callback);
return uid;
}
@@ -78,10 +78,12 @@ void RemoveEntryListener(unsigned int entry_listener_uid) {
Notifier::GetInstance().RemoveEntryListener(entry_listener_uid);
}
unsigned int AddConnectionListener(ConnectionListenerCallback callback) {
unsigned int AddConnectionListener(ConnectionListenerCallback callback,
bool immediate_notify) {
Notifier& notifier = Notifier::GetInstance();
unsigned int uid = notifier.AddConnectionListener(callback);
Notifier::GetInstance().Start();
if (immediate_notify) Dispatcher::GetInstance().NotifyConnections(callback);
return uid;
}