diff --git a/wpiutil/src/main/native/cpp/uv/Async.cpp b/wpiutil/src/main/native/cpp/uv/Async.cpp index e5014e0059..3ac5ffd082 100644 --- a/wpiutil/src/main/native/cpp/uv/Async.cpp +++ b/wpiutil/src/main/native/cpp/uv/Async.cpp @@ -12,7 +12,7 @@ namespace wpi { namespace uv { -std::shared_ptr Async::Create(Loop& loop) { +std::shared_ptr> Async<>::Create(Loop& loop) { auto h = std::make_shared(private_init{}); int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) { Async& h = *static_cast(handle->data); diff --git a/wpiutil/src/main/native/include/wpi/uv/Async.h b/wpiutil/src/main/native/include/wpi/uv/Async.h index ec3c32f149..7ab9289674 100644 --- a/wpiutil/src/main/native/include/wpi/uv/Async.h +++ b/wpiutil/src/main/native/include/wpi/uv/Async.h @@ -11,21 +11,99 @@ #include #include +#include +#include +#include +#include "wpi/STLExtras.h" #include "wpi/Signal.h" +#include "wpi/mutex.h" #include "wpi/uv/Handle.h" +#include "wpi/uv/Loop.h" namespace wpi { namespace uv { -class Loop; - /** * Async handle. * Async handles allow the user to "wakeup" the event loop and have a signal * generated from another thread. + * + * Data may be passed into the callback called on the event loop by using + * template parameters. If data parameters are used, the async callback will + * be called once for every call to Send(). If no data parameters are used, + * the async callback may or may not be called for every call to Send() (e.g. + * the calls may be coaleasced). */ -class Async final : public HandleImpl { +template +class Async final : public HandleImpl, uv_async_t> { + struct private_init {}; + + public: + explicit Async(const private_init&) {} + ~Async() noexcept override = default; + + /** + * Create an async handle. + * + * @param loop Loop object where this handle runs. + */ + static std::shared_ptr Create(Loop& loop) { + auto h = std::make_shared(private_init{}); + int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) { + auto& h = *static_cast(handle->data); + std::lock_guard lock(h.m_mutex); + for (auto&& v : h.m_data) apply_tuple(h.wakeup, v); + h.m_data.clear(); + }); + if (err < 0) { + loop.ReportError(err); + return nullptr; + } + h->Keep(); + return h; + } + + /** + * Create an async handle. + * + * @param loop Loop object where this handle runs. + */ + static std::shared_ptr Create(const std::shared_ptr& loop) { + return Create(*loop); + } + + /** + * Wakeup the event loop and emit the event. + * + * It’s safe to call this function from any thread EXCEPT the loop thread. + * An async event will be emitted on the loop thread. + */ + template + void Send(U&&... u) { + { + std::lock_guard lock(m_mutex); + m_data.emplace_back(std::forward_as_tuple(std::forward(u)...)); + } + this->Invoke(&uv_async_send, this->GetRaw()); + } + + /** + * Signal generated (on event loop thread) when the async event occurs. + */ + sig::Signal wakeup; + + private: + wpi::mutex m_mutex; + std::vector> m_data; +}; + +/** + * Async specialization for no data parameters. The async callback may or may + * not be called for every call to Send() (e.g. the calls may be coaleasced). + */ +template <> +class Async<> final : public HandleImpl, uv_async_t> { struct private_init {}; public: diff --git a/wpiutil/src/test/native/cpp/uv/UvAsyncTest.cpp b/wpiutil/src/test/native/cpp/uv/UvAsyncTest.cpp index 156d395a8e..e18f972422 100644 --- a/wpiutil/src/test/native/cpp/uv/UvAsyncTest.cpp +++ b/wpiutil/src/test/native/cpp/uv/UvAsyncTest.cpp @@ -51,7 +51,7 @@ TEST(UvAsync, Test) { std::thread theThread; auto loop = Loop::Create(); - auto async = Async::Create(loop); + auto async = Async<>::Create(loop); auto prepare = Prepare::Create(loop); loop->error.connect([](Error) { FAIL(); }); @@ -101,5 +101,78 @@ TEST(UvAsync, Test) { if (theThread.joinable()) theThread.join(); } +TEST(UvAsync, Data) { + int prepare_cb_called = 0; + int async_cb_called[2] = {0, 0}; + int close_cb_called = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = Async>::Create(loop); + auto prepare = Prepare::Create(loop); + + loop->error.connect([](Error) { FAIL(); }); + + prepare->error.connect([](Error) { FAIL(); }); + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { + async->Send(0, [&](int v) { + ASSERT_EQ(v, 0); + ++async_cb_called[0]; + }); + async->Send(1, [&](int v) { + ASSERT_EQ(v, 1); + ++async_cb_called[1]; + async->Close(); + prepare->Close(); + }); + }); + }); + prepare->Start(); + + async->error.connect([](Error) { FAIL(); }); + async->closed.connect([&] { close_cb_called++; }); + async->wakeup.connect([&](int v, std::function f) { f(v); }); + + loop->Run(); + + ASSERT_EQ(async_cb_called[0], 1); + ASSERT_EQ(async_cb_called[1], 1); + ASSERT_EQ(close_cb_called, 1); + + if (theThread.joinable()) theThread.join(); +} + +TEST(UvAsync, DataRef) { + int prepare_cb_called = 0; + int val = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = Async::Create(loop); + auto prepare = Prepare::Create(loop); + + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { async->Send(1, val); }); + }); + prepare->Start(); + + async->wakeup.connect([&](int v, int& r) { + r = v; + async->Close(); + prepare->Close(); + }); + + loop->Run(); + + ASSERT_EQ(val, 1); + + if (theThread.joinable()) theThread.join(); +} + } // namespace uv } // namespace wpi