[cscore] Fix Java direct callback notifications (#3631)

These relied on the OnStart and OnExit callbacks which were not present
in the wpiutil common CallbackManager code.  Add support for these and
fix up other use cases.

Also fixes notifications to correctly filter on event kind.

This fixes a breakage caused by #3133.
This commit is contained in:
Peter Johnson
2021-10-13 08:46:07 -07:00
committed by GitHub
parent 6d3dd99eb2
commit 61996c2bbf
7 changed files with 60 additions and 12 deletions

View File

@@ -53,6 +53,9 @@ class CallbackThread : public wpi::SafeThread {
using NotifierData = TNotifierData;
using ListenerData = TListenerData;
CallbackThread(std::function<void()> on_start, std::function<void()> on_exit)
: m_on_start(std::move(on_start)), m_on_exit(std::move(on_exit)) {}
~CallbackThread() override {
// Wake up any blocked pollers
for (size_t i = 0; i < m_pollers.size(); ++i) {
@@ -85,6 +88,9 @@ class CallbackThread : public wpi::SafeThread {
};
wpi::UidVector<std::shared_ptr<Poller>, 64> m_pollers;
std::function<void()> m_on_start;
std::function<void()> m_on_exit;
// Must be called with m_mutex held
template <typename... Args>
void SendPoller(unsigned int poller_uid, Args&&... args) {
@@ -106,18 +112,22 @@ class CallbackThread : public wpi::SafeThread {
template <typename Derived, typename TUserInfo, typename TListenerData,
typename TNotifierData>
void CallbackThread<Derived, TUserInfo, TListenerData, TNotifierData>::Main() {
if (m_on_start) {
m_on_start();
}
std::unique_lock lock(m_mutex);
while (m_active) {
while (m_queue.empty()) {
m_cond.wait(lock);
if (!m_active) {
return;
goto done;
}
}
while (!m_queue.empty()) {
if (!m_active) {
return;
goto done;
}
auto item = std::move(m_queue.front());
@@ -144,6 +154,7 @@ void CallbackThread<Derived, TUserInfo, TListenerData, TNotifierData>::Main() {
if (!listener) {
continue;
}
if (!static_cast<Derived*>(this)->Matches(listener, item.second)) {
continue;
}
@@ -164,6 +175,11 @@ void CallbackThread<Derived, TUserInfo, TListenerData, TNotifierData>::Main() {
m_queue_empty.notify_all();
}
done:
if (m_on_exit) {
m_on_exit();
}
}
// CRTP callback manager
@@ -177,6 +193,14 @@ class CallbackManager {
friend class RpcServerTest;
public:
void SetOnStart(std::function<void()> on_start) {
m_on_start = std::move(on_start);
}
void SetOnExit(std::function<void()> on_exit) {
m_on_exit = std::move(on_exit);
}
void Stop() { m_owner.Stop(); }
void Remove(unsigned int listener_uid) {
@@ -333,7 +357,7 @@ class CallbackManager {
protected:
template <typename... Args>
void DoStart(Args&&... args) {
m_owner.Start(std::forward<Args>(args)...);
m_owner.Start(m_on_start, m_on_exit, std::forward<Args>(args)...);
}
template <typename... Args>
@@ -361,6 +385,9 @@ class CallbackManager {
private:
wpi::SafeThreadOwner<Thread> m_owner;
std::function<void()> m_on_start;
std::function<void()> m_on_exit;
};
} // namespace wpi