mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-20 00:51:42 +00:00
[ntcore] Revamp listeners (#4511)
- In both C++ and Java, add listener functions to Instance class (same as NT3 provided) - Add WaitForListenerQueue functions (same as NT3 provided) - Move Java non-poller implementation to Instance (previously only handled single instance) - Change C++ listeners to take non-const references for subscribers etc to help avoid footguns from use of temporary objects (also add doc comment) - Fix Preferences making .type persistent
This commit is contained in:
@@ -65,6 +65,8 @@ class ListenerThread final : public wpi::SafeThreadEvent {
|
||||
wpi::DenseMap<NT_ConnectionListener,
|
||||
std::function<void(const ConnectionNotification& event)>>
|
||||
m_callbacks;
|
||||
wpi::Event m_waitQueueWakeup;
|
||||
wpi::Event m_waitQueueWaiter;
|
||||
};
|
||||
|
||||
class CLImpl {
|
||||
@@ -97,9 +99,10 @@ class CLImpl {
|
||||
|
||||
void ListenerThread::Main() {
|
||||
while (m_active) {
|
||||
WPI_Handle signaledBuf[2];
|
||||
auto signaled =
|
||||
wpi::WaitForObjects({m_poller, m_stopEvent.GetHandle()}, signaledBuf);
|
||||
WPI_Handle signaledBuf[3];
|
||||
auto signaled = wpi::WaitForObjects(
|
||||
{m_poller, m_stopEvent.GetHandle(), m_waitQueueWakeup.GetHandle()},
|
||||
signaledBuf);
|
||||
if (signaled.empty() || !m_active) {
|
||||
return;
|
||||
}
|
||||
@@ -118,6 +121,10 @@ void ListenerThread::Main() {
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
if (std::find(signaled.begin(), signaled.end(),
|
||||
m_waitQueueWakeup.GetHandle()) != signaled.end()) {
|
||||
m_waitQueueWaiter.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,12 +286,25 @@ bool ConnectionList::IsConnected() const {
|
||||
}
|
||||
|
||||
NT_ConnectionListener ConnectionList::AddListener(
|
||||
std::function<void(const ConnectionNotification& event)> callback,
|
||||
bool immediateNotify) {
|
||||
bool immediateNotify,
|
||||
std::function<void(const ConnectionNotification& event)> callback) {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
return m_impl->AddListener(std::move(callback), immediateNotify);
|
||||
}
|
||||
|
||||
bool ConnectionList::WaitForListenerQueue(double timeout) {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
WPI_EventHandle h;
|
||||
if (auto thr = m_impl->m_listenerThread.GetThread()) {
|
||||
h = thr->m_waitQueueWaiter.GetHandle();
|
||||
thr->m_waitQueueWakeup.Set();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
bool timedOut;
|
||||
return wpi::WaitForObject(h, timeout, &timedOut);
|
||||
}
|
||||
|
||||
NT_ConnectionListenerPoller ConnectionList::CreateListenerPoller() {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
return m_impl->CreateListenerPoller()->handle;
|
||||
|
||||
@@ -31,8 +31,9 @@ class ConnectionList final : public IConnectionList {
|
||||
bool IsConnected() const final;
|
||||
|
||||
NT_ConnectionListener AddListener(
|
||||
std::function<void(const ConnectionNotification& event)> callback,
|
||||
bool immediateNotify);
|
||||
bool immediateNotify,
|
||||
std::function<void(const ConnectionNotification& event)> callback);
|
||||
bool WaitForListenerQueue(double timeout);
|
||||
|
||||
NT_ConnectionListenerPoller CreateListenerPoller();
|
||||
void DestroyListenerPoller(NT_ConnectionListenerPoller pollerHandle);
|
||||
|
||||
@@ -298,6 +298,8 @@ struct TopicListenerThread final : public wpi::SafeThreadEvent {
|
||||
wpi::DenseMap<NT_TopicListener,
|
||||
std::function<void(const TopicNotification& event)>>
|
||||
m_callbacks;
|
||||
wpi::Event m_waitQueueWakeup;
|
||||
wpi::Event m_waitQueueWaiter;
|
||||
};
|
||||
|
||||
struct ValueListenerThread final : public wpi::SafeThreadEvent {
|
||||
@@ -312,6 +314,8 @@ struct ValueListenerThread final : public wpi::SafeThreadEvent {
|
||||
wpi::DenseMap<NT_ValueListener,
|
||||
std::function<void(const ValueNotification& event)>>
|
||||
m_callbacks;
|
||||
wpi::Event m_waitQueueWakeup;
|
||||
wpi::Event m_waitQueueWaiter;
|
||||
};
|
||||
|
||||
struct LSImpl {
|
||||
@@ -531,9 +535,10 @@ void SubscriberData::UpdateActive() {
|
||||
|
||||
void TopicListenerThread::Main() {
|
||||
while (m_active) {
|
||||
WPI_Handle signaledBuf[2];
|
||||
auto signaled =
|
||||
wpi::WaitForObjects({m_poller, m_stopEvent.GetHandle()}, signaledBuf);
|
||||
WPI_Handle signaledBuf[3];
|
||||
auto signaled = wpi::WaitForObjects(
|
||||
{m_poller, m_stopEvent.GetHandle(), m_waitQueueWakeup.GetHandle()},
|
||||
signaledBuf);
|
||||
if (signaled.empty() || !m_active) {
|
||||
break;
|
||||
}
|
||||
@@ -552,14 +557,19 @@ void TopicListenerThread::Main() {
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
if (std::find(signaled.begin(), signaled.end(),
|
||||
m_waitQueueWakeup.GetHandle()) != signaled.end()) {
|
||||
m_waitQueueWaiter.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ValueListenerThread::Main() {
|
||||
while (m_active) {
|
||||
WPI_Handle signaledBuf[2];
|
||||
auto signaled =
|
||||
wpi::WaitForObjects({m_poller, m_stopEvent.GetHandle()}, signaledBuf);
|
||||
WPI_Handle signaledBuf[3];
|
||||
auto signaled = wpi::WaitForObjects(
|
||||
{m_poller, m_stopEvent.GetHandle(), m_waitQueueWakeup.GetHandle()},
|
||||
signaledBuf);
|
||||
if (signaled.empty() || !m_active) {
|
||||
break;
|
||||
}
|
||||
@@ -578,6 +588,10 @@ void ValueListenerThread::Main() {
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
if (std::find(signaled.begin(), signaled.end(),
|
||||
m_waitQueueWakeup.GetHandle()) != signaled.end()) {
|
||||
m_waitQueueWaiter.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2365,6 +2379,19 @@ NT_TopicListener LocalStorage::AddTopicListener(
|
||||
return m_impl->AddTopicListener(topicHandle, mask, std::move(callback));
|
||||
}
|
||||
|
||||
bool LocalStorage::WaitForTopicListenerQueue(double timeout) {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
WPI_EventHandle h;
|
||||
if (auto thr = m_impl->m_topicListenerThread.GetThread()) {
|
||||
h = thr->m_waitQueueWaiter.GetHandle();
|
||||
thr->m_waitQueueWakeup.Set();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
bool timedOut;
|
||||
return wpi::WaitForObject(h, timeout, &timedOut);
|
||||
}
|
||||
|
||||
NT_TopicListenerPoller LocalStorage::CreateTopicListenerPoller() {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
return m_impl->AddTopicListenerPoller()->handle;
|
||||
@@ -2423,6 +2450,19 @@ NT_ValueListener LocalStorage::AddValueListener(
|
||||
return m_impl->AddValueListener(subentry, mask, std::move(callback));
|
||||
}
|
||||
|
||||
bool LocalStorage::WaitForValueListenerQueue(double timeout) {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
WPI_EventHandle h;
|
||||
if (auto thr = m_impl->m_valueListenerThread.GetThread()) {
|
||||
h = thr->m_waitQueueWaiter.GetHandle();
|
||||
thr->m_waitQueueWakeup.Set();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
bool timedOut;
|
||||
return wpi::WaitForObject(h, timeout, &timedOut);
|
||||
}
|
||||
|
||||
NT_ValueListenerPoller LocalStorage::CreateValueListenerPoller() {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
return m_impl->AddValueListenerPoller()->handle;
|
||||
|
||||
@@ -198,6 +198,7 @@ class LocalStorage final : public net::ILocalStorage {
|
||||
NT_TopicListener AddTopicListener(
|
||||
NT_Handle handle, unsigned int mask,
|
||||
std::function<void(const TopicNotification&)> callback);
|
||||
bool WaitForTopicListenerQueue(double timeout);
|
||||
|
||||
NT_TopicListenerPoller CreateTopicListenerPoller();
|
||||
void DestroyTopicListenerPoller(NT_TopicListenerPoller poller);
|
||||
@@ -220,6 +221,7 @@ class LocalStorage final : public net::ILocalStorage {
|
||||
NT_ValueListener AddValueListener(
|
||||
NT_Handle subentry, unsigned int mask,
|
||||
std::function<void(const ValueNotification&)> callback);
|
||||
bool WaitForValueListenerQueue(double timeout);
|
||||
|
||||
NT_ValueListenerPoller CreateValueListenerPoller();
|
||||
void DestroyValueListenerPoller(NT_ValueListenerPoller poller);
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "networktables/FloatTopic.h"
|
||||
#include "networktables/IntegerArrayTopic.h"
|
||||
#include "networktables/IntegerTopic.h"
|
||||
#include "networktables/MultiSubscriber.h"
|
||||
#include "networktables/RawTopic.h"
|
||||
#include "networktables/StringArrayTopic.h"
|
||||
#include "networktables/StringTopic.h"
|
||||
@@ -100,8 +101,16 @@ void NetworkTableInstance::SetServer(std::span<const std::string_view> servers,
|
||||
SetServer(serversArr);
|
||||
}
|
||||
|
||||
NT_ConnectionListener NetworkTableInstance::AddConnectionListener(
|
||||
std::function<void(const ConnectionNotification& event)> callback,
|
||||
bool immediate_notify) const {
|
||||
return ::nt::AddConnectionListener(m_handle, callback, immediate_notify);
|
||||
NT_TopicListener NetworkTableInstance::AddTopicListener(
|
||||
MultiSubscriber& subscriber, int eventMask,
|
||||
std::function<void(const TopicNotification&)> listener) {
|
||||
return ::nt::AddTopicListener(subscriber.GetHandle(), eventMask,
|
||||
std::move(listener));
|
||||
}
|
||||
|
||||
NT_ValueListener NetworkTableInstance::AddValueListener(
|
||||
MultiSubscriber& subscriber, unsigned int eventMask,
|
||||
std::function<void(const ValueNotification&)> listener) {
|
||||
return ::nt::AddValueListener(subscriber.GetHandle(), eventMask,
|
||||
std::move(listener));
|
||||
}
|
||||
|
||||
@@ -423,6 +423,10 @@ NT_TopicListener NT_AddTopicListenerSingle(NT_Topic topic, unsigned int mask,
|
||||
});
|
||||
}
|
||||
|
||||
NT_Bool NT_WaitForTopicListenerQueue(NT_Handle handle, double timeout) {
|
||||
return nt::WaitForTopicListenerQueue(handle, timeout);
|
||||
}
|
||||
|
||||
NT_TopicListenerPoller NT_CreateTopicListenerPoller(NT_Inst inst) {
|
||||
return nt::CreateTopicListenerPoller(inst);
|
||||
}
|
||||
@@ -477,6 +481,10 @@ NT_ValueListener NT_AddValueListener(NT_Handle subentry, unsigned int mask,
|
||||
});
|
||||
}
|
||||
|
||||
NT_Bool NT_WaitForValueListenerQueue(NT_Handle handle, double timeout) {
|
||||
return nt::WaitForValueListenerQueue(handle, timeout);
|
||||
}
|
||||
|
||||
NT_ValueListenerPoller NT_CreateValueListenerPoller(NT_Inst inst) {
|
||||
return nt::CreateValueListenerPoller(inst);
|
||||
}
|
||||
@@ -502,17 +510,19 @@ void NT_RemoveValueListener(NT_ValueListener value_listener) {
|
||||
}
|
||||
|
||||
NT_ConnectionListener NT_AddConnectionListener(
|
||||
NT_Inst inst, void* data, NT_ConnectionListenerCallback callback,
|
||||
NT_Bool immediate_notify) {
|
||||
return nt::AddConnectionListener(
|
||||
inst,
|
||||
[=](const ConnectionNotification& event) {
|
||||
NT_ConnectionNotification event_c;
|
||||
ConvertToC(event, &event_c);
|
||||
callback(data, &event_c);
|
||||
DisposeConnectionNotification(&event_c);
|
||||
},
|
||||
immediate_notify != 0);
|
||||
NT_Inst inst, NT_Bool immediate_notify, void* data,
|
||||
NT_ConnectionListenerCallback callback) {
|
||||
return nt::AddConnectionListener(inst, immediate_notify != 0,
|
||||
[=](const ConnectionNotification& event) {
|
||||
NT_ConnectionNotification event_c;
|
||||
ConvertToC(event, &event_c);
|
||||
callback(data, &event_c);
|
||||
DisposeConnectionNotification(&event_c);
|
||||
});
|
||||
}
|
||||
|
||||
NT_Bool NT_WaitForConnectionListenerQueue(NT_Handle handle, double timeout) {
|
||||
return nt::WaitForConnectionListenerQueue(handle, timeout);
|
||||
}
|
||||
|
||||
NT_ConnectionListenerPoller NT_CreateConnectionListenerPoller(NT_Inst inst) {
|
||||
|
||||
@@ -407,6 +407,14 @@ NT_TopicListener AddTopicListener(
|
||||
}
|
||||
}
|
||||
|
||||
bool WaitForTopicListenerQueue(NT_Handle handle, double timeout) {
|
||||
if (auto ii = InstanceImpl::GetHandle(handle)) {
|
||||
return ii->localStorage.WaitForTopicListenerQueue(timeout);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
NT_TopicListenerPoller CreateTopicListenerPoller(NT_Inst inst) {
|
||||
if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) {
|
||||
return ii->localStorage.CreateTopicListenerPoller();
|
||||
@@ -466,6 +474,14 @@ NT_ValueListener AddValueListener(
|
||||
}
|
||||
}
|
||||
|
||||
bool WaitForValueListenerQueue(NT_Handle handle, double timeout) {
|
||||
if (auto ii = InstanceImpl::GetHandle(handle)) {
|
||||
return ii->localStorage.WaitForValueListenerQueue(timeout);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
NT_ValueListenerPoller CreateValueListenerPoller(NT_Inst inst) {
|
||||
if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) {
|
||||
return ii->localStorage.CreateValueListenerPoller();
|
||||
@@ -505,12 +521,19 @@ void RemoveValueListener(NT_ValueListener listener) {
|
||||
}
|
||||
|
||||
NT_ConnectionListener AddConnectionListener(
|
||||
NT_Inst inst,
|
||||
std::function<void(const ConnectionNotification& event)> callback,
|
||||
bool immediate_notify) {
|
||||
NT_Inst inst, bool immediate_notify,
|
||||
std::function<void(const ConnectionNotification& event)> callback) {
|
||||
if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) {
|
||||
return ii->connectionList.AddListener(std::move(callback),
|
||||
immediate_notify);
|
||||
return ii->connectionList.AddListener(immediate_notify,
|
||||
std::move(callback));
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
bool WaitForConnectionListenerQueue(NT_Handle handle, double timeout) {
|
||||
if (auto ii = InstanceImpl::GetHandle(handle)) {
|
||||
return ii->connectionList.WaitForListenerQueue(timeout);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user