mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-21 01:01:43 +00:00
Implement independent instances.
Previously, most of the classes were implemented as singletons so only one
instance was possible.
This change adds an instance handle-based API. In Java, this API is located
in a different package than the old API (edu.wpi.first.networktables).
Backwards compatibility with ITable and the old NetworkTable API is largely
maintained, but a handful of classes have moved to the new package in Java
(ConnectionInfo and PersistentException), and the old JNI has been completed
replaced.
Also:
- Move SetTeam implementation to Dispatcher.
- Consistently pass time through Java and C++ Value API.
- Rename nt_Value.h to NetworkTableValue.h for consistency with Java.
- Improve documentation
- Make C++ and Java APIs more consistent
- Document RPC functions and support RPC in Java.
- Add polling features for entry and connection listeners and use them to
move callback threads to Java level.
- Remove thread start and stop hooks (as polling is available).
- Make Notifiers, RpcServer, Dispatcher, and Storage mockable.
- Set NOTIFY_NEW on immediate entry notifications.
- Make GetTable("/") and GetTable("") equivalent.
- Generate local notification for flags update when loading persistent file.
And many unit test updates/changes:
- Use InitGoogleMock instead of InitGoogleTest in test main.
- Move test printers to TestPrinter.h/cpp.
- Provide printers for StringRef, EntryNotifier, and Handle.
- StorageTest: Check notifications.
- Add entry notifier unit tests.
- Storage: Add test for incoming entry assignment.
- Update connection listener tests.
- Add entry listener unit tests.
Fixes #11, #140, #189, #190, #192, #193, #221
This commit is contained in:
338
src/main/native/cpp/CallbackManager.h
Normal file
338
src/main/native/cpp/CallbackManager.h
Normal file
@@ -0,0 +1,338 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Copyright (c) FIRST 2017. All Rights Reserved. */
|
||||
/* Open Source Software - may be modified and shared by FRC teams. The code */
|
||||
/* must be accompanied by the FIRST BSD license file in the root directory of */
|
||||
/* the project. */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
#ifndef NT_CALLBACKMANAGER_H_
|
||||
#define NT_CALLBACKMANAGER_H_
|
||||
|
||||
#include <atomic>
|
||||
#include <climits>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <utility>
|
||||
|
||||
#include "llvm/raw_ostream.h"
|
||||
#include "support/SafeThread.h"
|
||||
#include "support/UidVector.h"
|
||||
|
||||
namespace nt {
|
||||
|
||||
namespace impl {
|
||||
|
||||
template <typename Callback>
|
||||
class ListenerData {
|
||||
public:
|
||||
ListenerData() = default;
|
||||
ListenerData(Callback callback_) : callback(callback_) {}
|
||||
ListenerData(unsigned int poller_uid_) : poller_uid(poller_uid_) {}
|
||||
|
||||
explicit operator bool() const { return callback || poller_uid != UINT_MAX; }
|
||||
|
||||
Callback callback;
|
||||
unsigned int poller_uid = UINT_MAX;
|
||||
};
|
||||
|
||||
// CRTP callback manager thread
|
||||
// @tparam Derived derived class
|
||||
// @tparam NotifierData data buffered for each callback
|
||||
// @tparam ListenerData data stored for each listener
|
||||
// Derived must define the following functions:
|
||||
// bool Matches(const ListenerData& listener, const NotifierData& data);
|
||||
// void SetListener(NotifierData* data, unsigned int listener_uid);
|
||||
// void DoCallback(Callback callback, const NotifierData& data);
|
||||
template <typename Derived, typename TUserInfo,
|
||||
typename TListenerData =
|
||||
ListenerData<std::function<void(const TUserInfo& info)>>,
|
||||
typename TNotifierData = TUserInfo>
|
||||
class CallbackThread : public wpi::SafeThread {
|
||||
public:
|
||||
typedef TUserInfo UserInfo;
|
||||
typedef TNotifierData NotifierData;
|
||||
typedef TListenerData ListenerData;
|
||||
|
||||
~CallbackThread() {
|
||||
// Wake up any blocked pollers
|
||||
for (std::size_t i = 0; i < m_pollers.size(); ++i) {
|
||||
if (auto poller = m_pollers[i]) poller->Terminate();
|
||||
}
|
||||
}
|
||||
|
||||
void Main() override;
|
||||
|
||||
wpi::UidVector<ListenerData, 64> m_listeners;
|
||||
|
||||
std::queue<std::pair<unsigned int, NotifierData>> m_queue;
|
||||
std::condition_variable m_queue_empty;
|
||||
|
||||
struct Poller {
|
||||
void Terminate() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(poll_mutex);
|
||||
terminating = true;
|
||||
}
|
||||
poll_cond.notify_all();
|
||||
}
|
||||
std::queue<NotifierData> poll_queue;
|
||||
std::mutex poll_mutex;
|
||||
std::condition_variable poll_cond;
|
||||
bool terminating = false;
|
||||
bool cancelling = false;
|
||||
};
|
||||
wpi::UidVector<std::shared_ptr<Poller>, 64> m_pollers;
|
||||
|
||||
// Must be called with m_mutex held
|
||||
template <typename... Args>
|
||||
void SendPoller(unsigned int poller_uid, Args&&... args) {
|
||||
if (poller_uid > m_pollers.size()) return;
|
||||
auto poller = m_pollers[poller_uid];
|
||||
if (!poller) return;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(poller->poll_mutex);
|
||||
poller->poll_queue.emplace(std::forward<Args>(args)...);
|
||||
}
|
||||
poller->poll_cond.notify_one();
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Derived, typename TUserInfo, typename TListenerData,
|
||||
typename TNotifierData>
|
||||
void CallbackThread<Derived, TUserInfo, TListenerData, TNotifierData>::Main() {
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
while (m_active) {
|
||||
while (m_queue.empty()) {
|
||||
m_cond.wait(lock);
|
||||
if (!m_active) return;
|
||||
}
|
||||
|
||||
while (!m_queue.empty()) {
|
||||
if (!m_active) return;
|
||||
auto item = std::move(m_queue.front());
|
||||
|
||||
if (item.first != UINT_MAX) {
|
||||
if (item.first < m_listeners.size()) {
|
||||
auto& listener = m_listeners[item.first];
|
||||
if (listener &&
|
||||
static_cast<Derived*>(this)->Matches(listener, item.second)) {
|
||||
static_cast<Derived*>(this)->SetListener(&item.second, item.first);
|
||||
if (listener.callback) {
|
||||
lock.unlock();
|
||||
static_cast<Derived*>(this)->DoCallback(listener.callback,
|
||||
item.second);
|
||||
lock.lock();
|
||||
} else if (listener.poller_uid != UINT_MAX) {
|
||||
SendPoller(listener.poller_uid, std::move(item.second));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Use index because iterator might get invalidated.
|
||||
for (size_t i = 0; i < m_listeners.size(); ++i) {
|
||||
auto& listener = m_listeners[i];
|
||||
if (!listener) continue;
|
||||
if (!static_cast<Derived*>(this)->Matches(listener, item.second))
|
||||
continue;
|
||||
static_cast<Derived*>(this)->SetListener(&item.second, i);
|
||||
if (listener.callback) {
|
||||
lock.unlock();
|
||||
static_cast<Derived*>(this)->DoCallback(listener.callback,
|
||||
item.second);
|
||||
lock.lock();
|
||||
} else if (listener.poller_uid != UINT_MAX) {
|
||||
SendPoller(listener.poller_uid, item.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
m_queue.pop();
|
||||
}
|
||||
|
||||
m_queue_empty.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace impl
|
||||
|
||||
// CRTP callback manager
|
||||
// @tparam Derived derived class
|
||||
// @tparam Thread custom thread (must be derived from impl::CallbackThread)
|
||||
//
|
||||
// Derived must define the following functions:
|
||||
// void Start();
|
||||
template <typename Derived, typename Thread>
|
||||
class CallbackManager {
|
||||
friend class RpcServerTest;
|
||||
|
||||
public:
|
||||
void Stop() { m_owner.Stop(); }
|
||||
|
||||
void Remove(unsigned int listener_uid) {
|
||||
auto thr = m_owner.GetThread();
|
||||
if (!thr) return;
|
||||
thr->m_listeners.erase(listener_uid);
|
||||
}
|
||||
|
||||
unsigned int CreatePoller() {
|
||||
static_cast<Derived*>(this)->Start();
|
||||
auto thr = m_owner.GetThread();
|
||||
return thr->m_pollers.emplace_back(
|
||||
std::make_shared<typename Thread::Poller>());
|
||||
}
|
||||
|
||||
void RemovePoller(unsigned int poller_uid) {
|
||||
auto thr = m_owner.GetThread();
|
||||
if (!thr) return;
|
||||
|
||||
// Remove any listeners that are associated with this poller
|
||||
for (std::size_t i = 0; i < thr->m_listeners.size(); ++i) {
|
||||
if (thr->m_listeners[i].poller_uid == poller_uid)
|
||||
thr->m_listeners.erase(i);
|
||||
}
|
||||
|
||||
// Wake up any blocked pollers
|
||||
if (poller_uid >= thr->m_pollers.size()) return;
|
||||
auto poller = thr->m_pollers[poller_uid];
|
||||
if (!poller) return;
|
||||
poller->Terminate();
|
||||
return thr->m_pollers.erase(poller_uid);
|
||||
}
|
||||
|
||||
bool WaitForQueue(double timeout) {
|
||||
auto thr = m_owner.GetThread();
|
||||
if (!thr) return true;
|
||||
|
||||
auto& lock = thr.GetLock();
|
||||
#if defined(_MSC_VER) && _MSC_VER < 1900
|
||||
auto timeout_time = std::chrono::steady_clock::now() +
|
||||
std::chrono::duration<int64_t, std::nano>(
|
||||
static_cast<int64_t>(timeout * 1e9));
|
||||
#else
|
||||
auto timeout_time = std::chrono::steady_clock::now() +
|
||||
std::chrono::duration<double>(timeout);
|
||||
#endif
|
||||
while (!thr->m_queue.empty()) {
|
||||
if (!thr->m_active) return true;
|
||||
if (timeout == 0) return false;
|
||||
if (timeout < 0) {
|
||||
thr->m_queue_empty.wait(lock);
|
||||
} else {
|
||||
auto cond_timed_out = thr->m_queue_empty.wait_until(lock, timeout_time);
|
||||
if (cond_timed_out == std::cv_status::timeout) return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::vector<typename Thread::UserInfo> Poll(unsigned int poller_uid) {
|
||||
bool timed_out = false;
|
||||
return Poll(poller_uid, -1, &timed_out);
|
||||
}
|
||||
|
||||
std::vector<typename Thread::UserInfo> Poll(unsigned int poller_uid,
|
||||
double timeout, bool* timed_out) {
|
||||
std::vector<typename Thread::UserInfo> infos;
|
||||
std::shared_ptr<typename Thread::Poller> poller;
|
||||
{
|
||||
auto thr = m_owner.GetThread();
|
||||
if (!thr) return infos;
|
||||
if (poller_uid > thr->m_pollers.size()) return infos;
|
||||
poller = thr->m_pollers[poller_uid];
|
||||
if (!poller) return infos;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(poller->poll_mutex);
|
||||
#if defined(_MSC_VER) && _MSC_VER < 1900
|
||||
auto timeout_time = std::chrono::steady_clock::now() +
|
||||
std::chrono::duration<int64_t, std::nano>(
|
||||
static_cast<int64_t>(timeout * 1e9));
|
||||
#else
|
||||
auto timeout_time = std::chrono::steady_clock::now() +
|
||||
std::chrono::duration<double>(timeout);
|
||||
#endif
|
||||
*timed_out = false;
|
||||
while (poller->poll_queue.empty()) {
|
||||
if (poller->terminating) return infos;
|
||||
if (poller->cancelling) {
|
||||
// Note: this only works if there's a single thread calling this
|
||||
// function for any particular poller, but that's the intended use.
|
||||
poller->cancelling = false;
|
||||
return infos;
|
||||
}
|
||||
if (timeout == 0) {
|
||||
*timed_out = true;
|
||||
return infos;
|
||||
}
|
||||
if (timeout < 0) {
|
||||
poller->poll_cond.wait(lock);
|
||||
} else {
|
||||
auto cond_timed_out = poller->poll_cond.wait_until(lock, timeout_time);
|
||||
if (cond_timed_out == std::cv_status::timeout) {
|
||||
*timed_out = true;
|
||||
return infos;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (!poller->poll_queue.empty()) {
|
||||
infos.emplace_back(std::move(poller->poll_queue.front()));
|
||||
poller->poll_queue.pop();
|
||||
}
|
||||
return infos;
|
||||
}
|
||||
|
||||
void CancelPoll(unsigned int poller_uid) {
|
||||
std::shared_ptr<typename Thread::Poller> poller;
|
||||
{
|
||||
auto thr = m_owner.GetThread();
|
||||
if (!thr) return;
|
||||
if (poller_uid > thr->m_pollers.size()) return;
|
||||
poller = thr->m_pollers[poller_uid];
|
||||
if (!poller) return;
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(poller->poll_mutex);
|
||||
poller->cancelling = true;
|
||||
}
|
||||
poller->poll_cond.notify_one();
|
||||
}
|
||||
|
||||
protected:
|
||||
template <typename... Args>
|
||||
void DoStart(Args&&... args) {
|
||||
auto thr = m_owner.GetThread();
|
||||
if (!thr) m_owner.Start(new Thread(std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
unsigned int DoAdd(Args&&... args) {
|
||||
static_cast<Derived*>(this)->Start();
|
||||
auto thr = m_owner.GetThread();
|
||||
return thr->m_listeners.emplace_back(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
void Send(unsigned int only_listener, Args&&... args) {
|
||||
auto thr = m_owner.GetThread();
|
||||
if (!thr || thr->m_listeners.empty()) return;
|
||||
thr->m_queue.emplace(std::piecewise_construct,
|
||||
std::make_tuple(only_listener),
|
||||
std::forward_as_tuple(std::forward<Args>(args)...));
|
||||
thr->m_cond.notify_one();
|
||||
}
|
||||
|
||||
typename wpi::SafeThreadOwner<Thread>::Proxy GetThread() const {
|
||||
return m_owner.GetThread();
|
||||
}
|
||||
|
||||
private:
|
||||
wpi::SafeThreadOwner<Thread> m_owner;
|
||||
};
|
||||
|
||||
} // namespace nt
|
||||
|
||||
#endif // NT_CALLBACKMANAGER_H_
|
||||
Reference in New Issue
Block a user