diff --git a/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h b/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h new file mode 100644 index 0000000000..ee3b85e4b1 --- /dev/null +++ b/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h @@ -0,0 +1,238 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) 2018 FIRST. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#ifndef WPIUTIL_WPI_UV_ASYNCFUNCTION_H_ +#define WPIUTIL_WPI_UV_ASYNCFUNCTION_H_ + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "wpi/STLExtras.h" +#include "wpi/condition_variable.h" +#include "wpi/mutex.h" +#include "wpi/uv/Handle.h" +#include "wpi/uv/Loop.h" + +namespace wpi { +namespace uv { + +namespace detail { + +class AsyncFunctionBase { + public: + virtual ~AsyncFunctionBase() { + m_active = false; + m_resultCv.notify_all(); // wake up any waiters + } + + protected: + wpi::mutex m_mutex; + std::atomic_bool m_active{true}; + wpi::condition_variable m_resultCv; +}; + +template +struct AsyncFunctionHelper : public AsyncFunctionBase { + inline void RunCall(const std::function& func, + std::pair>& v) { + m_results.emplace_back( + std::piecewise_construct, std::forward_as_tuple(v.first), + std::forward_as_tuple(apply_tuple(func, std::move(v.second)))); + } + + inline R GetCallResult(std::thread::id from) { + // wait for response + std::unique_lock lock(m_mutex); + while (m_active) { + // Did we get a response to *our* request? + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r.first == from; }); + if (it != m_results.end()) { + // Yes, remove it from the vector and we're done. + auto rv = std::move(it->second); + m_results.erase(it); + return rv; + } + // No, keep waiting for a response + m_resultCv.wait(lock); + } + + return R(); + } + + private: + std::vector> m_results; +}; + +// void return value partial specialization +template +struct AsyncFunctionHelper : public AsyncFunctionBase { + inline void RunCall(const std::function& func, + std::pair>& v) { + apply_tuple(func, std::move(v.second)); + m_results.emplace_back(v.first); + } + + inline void GetCallResult(std::thread::id from) { + // wait for response + std::unique_lock lock(m_mutex); + while (m_active) { + // Did we get a response to *our* request? + auto it = std::find(m_results.begin(), m_results.end(), from); + if (it != m_results.end()) { + // Yes, remove it from the vector and we're done. + m_results.erase(it); + return; + } + // No, keep waiting for a response + m_resultCv.wait(lock); + } + } + + private: + std::vector m_results; +}; + +} // namespace detail + +template +class AsyncFunction; + +/** + * Function async handle. + * Async handles allow the user to "wakeup" the event loop and have a function + * called from another thread that returns a result to the calling thread. + */ +template +class AsyncFunction final + : public HandleImpl, uv_async_t>, + private detail::AsyncFunctionHelper { + struct private_init {}; + + public: + AsyncFunction(std::function func, const private_init&) + : wakeup{func} {} + ~AsyncFunction() noexcept override = default; + + /** + * Create an async handle. + * + * @param loop Loop object where this handle runs. + * @param func wakeup function to be called (sets wakeup value) + */ + static std::shared_ptr Create( + Loop& loop, std::function func = nullptr) { + auto h = std::make_shared(std::move(func), private_init{}); + int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) { + auto& h = *static_cast(handle->data); + std::lock_guard lock(h.m_mutex); + + if (!h.m_params.empty()) { + // for each set of parameters in the input queue + for (auto&& v : h.m_params) { + // call the wakeup function and put the result in the output queue + // if the caller is waiting for it + if (v.first == std::thread::id{}) + apply_tuple(h.wakeup, std::move(v.second)); + else + h.RunCall(h.wakeup, v); + } + h.m_params.clear(); + // wake up any threads that might be waiting for the result + h.m_resultCv.notify_all(); + } + }); + if (err < 0) { + loop.ReportError(err); + return nullptr; + } + h->Keep(); + return h; + } + + /** + * Create an async handle. + * + * @param loop Loop object where this handle runs. + * @param func wakeup function to be called (sets wakeup value) + */ + static std::shared_ptr Create( + const std::shared_ptr& loop, + std::function func = nullptr) { + return Create(*loop, std::move(func)); + } + + /** + * Wakeup the event loop, call the async function, and ignore any result. + * This is non-blocking and does NOT wait until the async function returns. + * + * It’s safe to call this function from any thread EXCEPT the loop thread. + * The async function will be called on the loop thread. + */ + template + void Send(U&&... u) { + // add the parameters to the input queue + { + std::lock_guard lock(this->m_mutex); + m_params.emplace_back(std::piecewise_construct, + std::forward_as_tuple(std::thread::id{}), + std::forward_as_tuple(std::forward(u)...)); + } + + // signal the loop + this->Invoke(&uv_async_send, this->GetRaw()); + } + + /** + * Wakeup the event loop, call the async function, and return the result. + * This blocks the calling thread until the async function returns. + * + * It’s safe to call this function from any thread EXCEPT the loop thread. + * The async function will be called on the loop thread. + * + * Returns a default-constructed result if this handle is destroyed while + * waiting for a result. + */ + template + R Call(U&&... u) { + std::thread::id from = std::this_thread::get_id(); + + // add the parameters to the input queue + { + std::lock_guard lock(this->m_mutex); + m_params.emplace_back(std::piecewise_construct, + std::forward_as_tuple(from), + std::forward_as_tuple(std::forward(u)...)); + } + + // signal the loop + this->Invoke(&uv_async_send, this->GetRaw()); + + // wait for response + return this->GetCallResult(from); + } + + /** + * Function called (on event loop thread) when the async is called. + */ + std::function wakeup; + + private: + std::vector>> m_params; +}; + +} // namespace uv +} // namespace wpi + +#endif // WPIUTIL_WPI_UV_ASYNCFUNCTION_H_ diff --git a/wpiutil/src/test/native/cpp/uv/UvAsyncFunctionTest.cpp b/wpiutil/src/test/native/cpp/uv/UvAsyncFunctionTest.cpp new file mode 100644 index 0000000000..3e4f83d84a --- /dev/null +++ b/wpiutil/src/test/native/cpp/uv/UvAsyncFunctionTest.cpp @@ -0,0 +1,178 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) 2018 FIRST. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "wpi/uv/AsyncFunction.h" // NOLINT(build/include_order) + +#include "gtest/gtest.h" // NOLINT(build/include_order) + +#include + +#include "wpi/uv/Loop.h" +#include "wpi/uv/Prepare.h" + +namespace wpi { +namespace uv { + +TEST(UvAsyncFunction, Test) { + int prepare_cb_called = 0; + int async_cb_called[2] = {0, 0}; + int close_cb_called = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = AsyncFunction::Create(loop); + auto prepare = Prepare::Create(loop); + + loop->error.connect([](Error) { FAIL(); }); + + prepare->error.connect([](Error) { FAIL(); }); + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { + ASSERT_EQ(async->Call(0), 1); + ASSERT_EQ(async->Call(1), 2); + }); + }); + prepare->Start(); + + async->error.connect([](Error) { FAIL(); }); + async->closed.connect([&] { close_cb_called++; }); + async->wakeup = [&](int v) { + ++async_cb_called[v]; + if (v == 1) { + async->Close(); + prepare->Close(); + } + return v + 1; + }; + + loop->Run(); + + ASSERT_EQ(async_cb_called[0], 1); + ASSERT_EQ(async_cb_called[1], 1); + ASSERT_EQ(close_cb_called, 1); + + if (theThread.joinable()) theThread.join(); +} + +TEST(UvAsyncFunction, Ref) { + int prepare_cb_called = 0; + int val = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = AsyncFunction::Create(loop); + auto prepare = Prepare::Create(loop); + + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { ASSERT_EQ(async->Call(1, val), 2); }); + }); + prepare->Start(); + + async->wakeup = [&](int v, int& r) { + r = v; + async->Close(); + prepare->Close(); + return v + 1; + }; + + loop->Run(); + + ASSERT_EQ(val, 1); + + if (theThread.joinable()) theThread.join(); +} + +TEST(UvAsyncFunction, Movable) { + int prepare_cb_called = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = + AsyncFunction(std::unique_ptr)>::Create(loop); + auto prepare = Prepare::Create(loop); + + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { + auto val = std::make_unique(1); + auto val2 = async->Call(std::move(val)); + ASSERT_NE(val2, nullptr); + ASSERT_EQ(*val2, 1); + }); + }); + prepare->Start(); + + async->wakeup = [&](std::unique_ptr v) { + async->Close(); + prepare->Close(); + return v; + }; + + loop->Run(); + + if (theThread.joinable()) theThread.join(); +} + +TEST(UvAsyncFunction, Send) { + int prepare_cb_called = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = + AsyncFunction(std::unique_ptr)>::Create(loop); + auto prepare = Prepare::Create(loop); + + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { async->Send(std::make_unique(1)); }); + }); + prepare->Start(); + + async->wakeup = [&](std::unique_ptr v) { + async->Close(); + prepare->Close(); + return v; + }; + + loop->Run(); + + if (theThread.joinable()) theThread.join(); +} + +TEST(UvAsyncFunction, VoidCall) { + int prepare_cb_called = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = AsyncFunction::Create(loop); + auto prepare = Prepare::Create(loop); + + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { async->Call(); }); + }); + prepare->Start(); + + async->wakeup = [&]() { + async->Close(); + prepare->Close(); + }; + + loop->Run(); + + if (theThread.joinable()) theThread.join(); +} + +} // namespace uv +} // namespace wpi