From d03b0203265dc1a271172e788b516d13a70d4e4e Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Mon, 29 Oct 2018 20:54:42 -0700 Subject: [PATCH] wpiutil: Add WorkerThread (#1302) This provides a worker thread that can execute a work function with the result going into either a future or a uv::Loop functor. --- .../main/native/include/wpi/WorkerThread.h | 276 ++++++++++++++++++ .../src/test/native/cpp/WorkerThreadTest.cpp | 73 +++++ 2 files changed, 349 insertions(+) create mode 100644 wpiutil/src/main/native/include/wpi/WorkerThread.h create mode 100644 wpiutil/src/test/native/cpp/WorkerThreadTest.cpp diff --git a/wpiutil/src/main/native/include/wpi/WorkerThread.h b/wpiutil/src/main/native/include/wpi/WorkerThread.h new file mode 100644 index 0000000000..9a04f1aa5b --- /dev/null +++ b/wpiutil/src/main/native/include/wpi/WorkerThread.h @@ -0,0 +1,276 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) 2018 FIRST. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#ifndef WPIUTIL_WPI_WORKERTHREAD_H_ +#define WPIUTIL_WPI_WORKERTHREAD_H_ + +#include +#include +#include +#include +#include + +#include "wpi/STLExtras.h" +#include "wpi/SafeThread.h" +#include "wpi/future.h" +#include "wpi/uv/Async.h" + +namespace wpi { + +namespace detail { + +template +struct WorkerThreadAsync { + using AfterWorkFunction = std::function; + + ~WorkerThreadAsync() { UnsetLoop(); } + + void SetLoop(uv::Loop& loop) { + auto async = uv::Async::Create(loop); + async->wakeup.connect( + [](AfterWorkFunction func, R result) { func(result); }); + m_async = async; + } + + void UnsetLoop() { + if (auto async = m_async.lock()) { + async->Close(); + m_async.reset(); + } + } + + std::weak_ptr> m_async; +}; + +template <> +struct WorkerThreadAsync { + using AfterWorkFunction = std::function; + + ~WorkerThreadAsync() { RemoveLoop(); } + + void SetLoop(uv::Loop& loop) { + auto async = uv::Async::Create(loop); + async->wakeup.connect([](AfterWorkFunction func) { func(); }); + m_async = async; + } + + void RemoveLoop() { + if (auto async = m_async.lock()) { + async->Close(); + m_async.reset(); + } + } + + std::weak_ptr> m_async; +}; + +template +struct WorkerThreadRequest { + using WorkFunction = std::function; + using AfterWorkFunction = typename WorkerThreadAsync::AfterWorkFunction; + + WorkerThreadRequest() = default; + WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_, + std::tuple params_) + : promiseId(promiseId_), + work(std::move(work_)), + params(std::move(params_)) {} + WorkerThreadRequest(WorkFunction work_, AfterWorkFunction afterWork_, + std::tuple params_) + : promiseId(0), + work(std::move(work_)), + afterWork(std::move(afterWork_)), + params(std::move(params_)) {} + + uint64_t promiseId; + WorkFunction work; + AfterWorkFunction afterWork; + std::tuple params; +}; + +template +class WorkerThreadThread : public SafeThread { + public: + using Request = WorkerThreadRequest; + + void Main() override; + + std::vector m_requests; + PromiseFactory m_promises; + detail::WorkerThreadAsync m_async; +}; + +template +void RunWorkerThreadRequest(WorkerThreadThread& thr, + WorkerThreadRequest& req) { + R result = apply_tuple(req.work, std::move(req.params)); + if (req.afterWork) { + if (auto async = thr.m_async.m_async.lock()) + async->Send(std::move(req.afterWork), std::move(result)); + } else { + thr.m_promises.SetValue(req.promiseId, std::move(result)); + } +} + +template +void RunWorkerThreadRequest(WorkerThreadThread& thr, + WorkerThreadRequest& req) { + apply_tuple(req.work, req.params); + if (req.afterWork) { + if (auto async = thr.m_async.m_async.lock()) + async->Send(std::move(req.afterWork)); + } else { + thr.m_promises.SetValue(req.promiseId); + } +} + +template +void WorkerThreadThread::Main() { + std::vector requests; + while (m_active) { + std::unique_lock lock(m_mutex); + m_cond.wait(lock, [&] { return !m_active || !m_requests.empty(); }); + if (!m_active) break; + + // don't want to hold the lock while executing the callbacks + requests.swap(m_requests); + lock.unlock(); + + for (auto&& req : requests) { + if (!m_active) break; // requests may be long-running + RunWorkerThreadRequest(*this, req); + } + requests.clear(); + m_promises.Notify(); + } +} + +} // namespace detail + +template +class WorkerThread; + +template +class WorkerThread final { + using Thread = detail::WorkerThreadThread; + + public: + using WorkFunction = std::function; + using AfterWorkFunction = + typename detail::WorkerThreadAsync::AfterWorkFunction; + + WorkerThread() { m_owner.Start(); } + + /** + * Set the loop. This must be called from the loop thread. + * Subsequent calls to QueueWorkThen will run afterWork on the provided + * loop (via an async handle). + * + * @param loop the loop to use for running afterWork routines + */ + void SetLoop(uv::Loop& loop) { + if (auto thr = m_owner.GetThread()) thr->m_async.SetLoop(loop); + } + + /** + * Set the loop. This must be called from the loop thread. + * Subsequent calls to QueueWorkThen will run afterWork on the provided + * loop (via an async handle). + * + * @param loop the loop to use for running afterWork routines + */ + void SetLoop(std::shared_ptr loop) { SetLoop(*loop); } + + /** + * Unset the loop. This must be called from the loop thread. + * Subsequent calls to QueueWorkThen will no longer run afterWork. + */ + void UnsetLoop() { + if (auto thr = m_owner.GetThread()) thr->m_async.UnsetLoop(); + } + + /** + * Get the handle used by QueueWorkThen() to run afterWork. + * This handle is set by SetLoop(). + * Calling Close() on this handle is the same as calling UnsetLoop(). + * + * @return The handle (if nullptr, no handle is set) + */ + std::shared_ptr GetHandle() const { + if (auto thr = m_owner.GetThread()) + return thr->m_async.m_async.lock(); + else + return nullptr; + } + + /** + * Wakeup the worker thread, call the work function, and return a future for + * the result. + * + * It’s safe to call this function from any thread. + * The work function will be called on the worker thread. + * + * The future will return a default-constructed result if this class is + * destroyed while waiting for a result. + * + * @param work Work function (called on worker thread) + */ + template + future QueueWork(WorkFunction work, U&&... u) { + if (auto thr = m_owner.GetThread()) { + // create the future + uint64_t req = thr->m_promises.CreateRequest(); + + // add the parameters to the input queue + thr->m_requests.emplace_back( + req, std::move(work), std::forward_as_tuple(std::forward(u)...)); + + // signal the thread + thr->m_cond.notify_one(); + + // return future + return thr->m_promises.CreateFuture(req); + } + + // XXX: is this the right thing to do? + return future(); + } + + /** + * Wakeup the worker thread, call the work function, and call the afterWork + * function with the result on the loop set by SetLoop(). + * + * It’s safe to call this function from any thread. + * The work function will be called on the worker thread, and the afterWork + * function will be called on the loop thread. + * + * SetLoop() must be called prior to calling this function for afterWork to + * be called. + * + * @param work Work function (called on worker thread) + * @param afterWork After work function (called on loop thread) + */ + template + void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U&&... u) { + if (auto thr = m_owner.GetThread()) { + // add the parameters to the input queue + thr->m_requests.emplace_back( + std::move(work), std::move(afterWork), + std::forward_as_tuple(std::forward(u)...)); + + // signal the thread + thr->m_cond.notify_one(); + } + } + + private: + SafeThreadOwner m_owner; +}; + +} // namespace wpi + +#endif // WPIUTIL_WPI_WORKERTHREAD_H_ diff --git a/wpiutil/src/test/native/cpp/WorkerThreadTest.cpp b/wpiutil/src/test/native/cpp/WorkerThreadTest.cpp new file mode 100644 index 0000000000..610029cd8e --- /dev/null +++ b/wpiutil/src/test/native/cpp/WorkerThreadTest.cpp @@ -0,0 +1,73 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) 2018 FIRST. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "wpi/WorkerThread.h" // NOLINT(build/include_order) + +#include "gtest/gtest.h" // NOLINT(build/include_order) + +#include + +#include "wpi/uv/Loop.h" + +namespace wpi { + +TEST(WorkerThread, Future) { + WorkerThread worker; + future f = + worker.QueueWork([](bool v) -> int { return v ? 1 : 2; }, true); + ASSERT_EQ(f.get(), 1); +} + +TEST(WorkerThread, FutureVoid) { + int callbacks = 0; + WorkerThread worker; + future f = worker.QueueWork( + [&](int v) { + ++callbacks; + ASSERT_EQ(v, 3); + }, + 3); + f.get(); + ASSERT_EQ(callbacks, 1); +} + +TEST(WorkerThread, Loop) { + int callbacks = 0; + WorkerThread worker; + auto loop = uv::Loop::Create(); + worker.SetLoop(*loop); + worker.QueueWorkThen([](bool v) -> int { return v ? 1 : 2; }, + [&](int v2) { + ++callbacks; + loop->Stop(); + ASSERT_EQ(v2, 1); + }, + true); + auto f = worker.QueueWork([](bool) -> int { return 2; }, true); + ASSERT_EQ(f.get(), 2); + loop->Run(); + ASSERT_EQ(callbacks, 1); +} + +TEST(WorkerThread, LoopVoid) { + int callbacks = 0; + WorkerThread worker; + auto loop = uv::Loop::Create(); + worker.SetLoop(*loop); + worker.QueueWorkThen([](bool) {}, + [&]() { + ++callbacks; + loop->Stop(); + }, + true); + auto f = worker.QueueWork([](bool) {}, true); + f.get(); + loop->Run(); + ASSERT_EQ(callbacks, 1); +} + +} // namespace wpi