From 172e438cd66433659f2bbe4fc97fab3a5fab12c1 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 16 Sep 2018 15:36:19 -0700 Subject: [PATCH] wpiutil: uv::Async: Keep weak reference to loop Other handles can only be used within the loop itself, but Async is intended to be used from another thread. This introduces the possibility of a race condition between the loop being destroyed and the Async being destroyed. Change Async to keep a weak reference to a loop and check it before performing libuv operations. --- wpiutil/src/main/native/cpp/uv/Async.cpp | 15 +++- .../src/main/native/include/wpi/uv/Async.h | 72 ++++++++++-------- .../native/include/wpi/uv/AsyncFunction.h | 74 +++++++++++-------- .../src/main/native/include/wpi/uv/Handle.h | 5 +- 4 files changed, 100 insertions(+), 66 deletions(-) diff --git a/wpiutil/src/main/native/cpp/uv/Async.cpp b/wpiutil/src/main/native/cpp/uv/Async.cpp index 3ac5ffd082..5479f4987f 100644 --- a/wpiutil/src/main/native/cpp/uv/Async.cpp +++ b/wpiutil/src/main/native/cpp/uv/Async.cpp @@ -12,14 +12,21 @@ namespace wpi { namespace uv { -std::shared_ptr> Async<>::Create(Loop& loop) { - auto h = std::make_shared(private_init{}); - int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) { +Async<>::~Async() noexcept { + if (auto loop = m_loop.lock()) + Close(); + else + ForceClosed(); +} + +std::shared_ptr> Async<>::Create(const std::shared_ptr& loop) { + auto h = std::make_shared(loop, private_init{}); + int err = uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) { Async& h = *static_cast(handle->data); h.wakeup(); }); if (err < 0) { - loop.ReportError(err); + loop->ReportError(err); return nullptr; } h->Keep(); diff --git a/wpiutil/src/main/native/include/wpi/uv/Async.h b/wpiutil/src/main/native/include/wpi/uv/Async.h index 7ab9289674..389052b566 100644 --- a/wpiutil/src/main/native/include/wpi/uv/Async.h +++ b/wpiutil/src/main/native/include/wpi/uv/Async.h @@ -40,8 +40,14 @@ class Async final : public HandleImpl, uv_async_t> { struct private_init {}; public: - explicit Async(const private_init&) {} - ~Async() noexcept override = default; + Async(const std::shared_ptr& loop, const private_init&) + : m_loop{loop} {} + ~Async() noexcept override { + if (auto loop = m_loop.lock()) + this->Close(); + else + this->ForceClosed(); + } /** * Create an async handle. @@ -49,19 +55,7 @@ class Async final : public HandleImpl, uv_async_t> { * @param loop Loop object where this handle runs. */ static std::shared_ptr Create(Loop& loop) { - auto h = std::make_shared(private_init{}); - int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) { - auto& h = *static_cast(handle->data); - std::lock_guard lock(h.m_mutex); - for (auto&& v : h.m_data) apply_tuple(h.wakeup, v); - h.m_data.clear(); - }); - if (err < 0) { - loop.ReportError(err); - return nullptr; - } - h->Keep(); - return h; + return Create(loop.shared_from_this()); } /** @@ -70,7 +64,20 @@ class Async final : public HandleImpl, uv_async_t> { * @param loop Loop object where this handle runs. */ static std::shared_ptr Create(const std::shared_ptr& loop) { - return Create(*loop); + auto h = std::make_shared(loop, private_init{}); + int err = + uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) { + auto& h = *static_cast(handle->data); + std::lock_guard lock(h.m_mutex); + for (auto&& v : h.m_data) apply_tuple(h.wakeup, v); + h.m_data.clear(); + }); + if (err < 0) { + loop->ReportError(err); + return nullptr; + } + h->Keep(); + return h; } /** @@ -85,7 +92,7 @@ class Async final : public HandleImpl, uv_async_t> { std::lock_guard lock(m_mutex); m_data.emplace_back(std::forward_as_tuple(std::forward(u)...)); } - this->Invoke(&uv_async_send, this->GetRaw()); + if (auto loop = m_loop.lock()) this->Invoke(&uv_async_send, this->GetRaw()); } /** @@ -96,6 +103,7 @@ class Async final : public HandleImpl, uv_async_t> { private: wpi::mutex m_mutex; std::vector> m_data; + std::weak_ptr m_loop; }; /** @@ -107,37 +115,43 @@ class Async<> final : public HandleImpl, uv_async_t> { struct private_init {}; public: - explicit Async(const private_init&) {} - ~Async() noexcept override = default; + Async(const std::shared_ptr& loop, const private_init&) + : m_loop(loop) {} + ~Async() noexcept override; /** * Create an async handle. * * @param loop Loop object where this handle runs. */ - static std::shared_ptr Create(Loop& loop); - - /** - * Create an async handle. - * - * @param loop Loop object where this handle runs. - */ - static std::shared_ptr Create(const std::shared_ptr& loop) { - return Create(*loop); + static std::shared_ptr Create(Loop& loop) { + return Create(loop.shared_from_this()); } + /** + * Create an async handle. + * + * @param loop Loop object where this handle runs. + */ + static std::shared_ptr Create(const std::shared_ptr& loop); + /** * Wakeup the event loop and emit the event. * * It’s safe to call this function from any thread. * An async event will be emitted on the loop thread. */ - void Send() { Invoke(&uv_async_send, GetRaw()); } + void Send() { + if (auto loop = m_loop.lock()) Invoke(&uv_async_send, GetRaw()); + } /** * Signal generated (on event loop thread) when the async event occurs. */ sig::Signal<> wakeup; + + private: + std::weak_ptr m_loop; }; } // namespace uv diff --git a/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h b/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h index 1ebaf3024b..c9e436cfca 100644 --- a/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h +++ b/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h @@ -40,9 +40,15 @@ class AsyncFunction final struct private_init {}; public: - AsyncFunction(std::function, T...)> func, const private_init&) - : wakeup{func} {} - ~AsyncFunction() noexcept override = default; + AsyncFunction(const std::shared_ptr& loop, + std::function, T...)> func, const private_init&) + : wakeup{func}, m_loop{loop} {} + ~AsyncFunction() noexcept override { + if (auto loop = m_loop.lock()) + this->Close(); + else + this->ForceClosed(); + } /** * Create an async handle. @@ -55,33 +61,7 @@ class AsyncFunction final */ static std::shared_ptr Create( Loop& loop, std::function, T...)> func = nullptr) { - auto h = std::make_shared(std::move(func), private_init{}); - int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) { - auto& h = *static_cast(handle->data); - std::unique_lock lock(h.m_mutex); - - if (!h.m_params.empty()) { - // for each set of parameters in the input queue, call the wakeup - // function and put the result in the output queue if the caller is - // waiting for it - for (auto&& v : h.m_params) { - auto p = h.m_promises.CreatePromise(v.first); - if (h.wakeup) - apply_tuple(h.wakeup, std::tuple_cat(std::make_tuple(std::move(p)), - std::move(v.second))); - } - h.m_params.clear(); - // wake up any threads that might be waiting for the result - lock.unlock(); - h.m_promises.Notify(); - } - }); - if (err < 0) { - loop.ReportError(err); - return nullptr; - } - h->Keep(); - return h; + return Create(loop.shared_from_this(), std::move(func)); } /** @@ -96,7 +76,36 @@ class AsyncFunction final static std::shared_ptr Create( const std::shared_ptr& loop, std::function, T...)> func = nullptr) { - return Create(*loop, std::move(func)); + auto h = + std::make_shared(loop, std::move(func), private_init{}); + int err = + uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) { + auto& h = *static_cast(handle->data); + std::unique_lock lock(h.m_mutex); + + if (!h.m_params.empty()) { + // for each set of parameters in the input queue, call the wakeup + // function and put the result in the output queue if the caller is + // waiting for it + for (auto&& v : h.m_params) { + auto p = h.m_promises.CreatePromise(v.first); + if (h.wakeup) + apply_tuple(h.wakeup, + std::tuple_cat(std::make_tuple(std::move(p)), + std::move(v.second))); + } + h.m_params.clear(); + // wake up any threads that might be waiting for the result + lock.unlock(); + h.m_promises.Notify(); + } + }); + if (err < 0) { + loop->ReportError(err); + return nullptr; + } + h->Keep(); + return h; } /** @@ -123,7 +132,7 @@ class AsyncFunction final } // signal the loop - this->Invoke(&uv_async_send, this->GetRaw()); + if (auto loop = m_loop.lock()) this->Invoke(&uv_async_send, this->GetRaw()); // return future return m_promises.CreateFuture(req); @@ -143,6 +152,7 @@ class AsyncFunction final wpi::mutex m_mutex; std::vector>> m_params; PromiseFactory m_promises; + std::weak_ptr m_loop; }; } // namespace uv diff --git a/wpiutil/src/main/native/include/wpi/uv/Handle.h b/wpiutil/src/main/native/include/wpi/uv/Handle.h index 4ee0d9df38..046c1551d0 100644 --- a/wpiutil/src/main/native/include/wpi/uv/Handle.h +++ b/wpiutil/src/main/native/include/wpi/uv/Handle.h @@ -104,7 +104,9 @@ class Handle : public std::enable_shared_from_this { * * @return True if the handle is closing or closed, false otherwise. */ - bool IsClosing() const noexcept { return uv_is_closing(m_uv_handle) != 0; } + bool IsClosing() const noexcept { + return m_closed || uv_is_closing(m_uv_handle) != 0; + } /** * Request handle to be closed. @@ -219,6 +221,7 @@ class Handle : public std::enable_shared_from_this { void Keep() noexcept { m_self = shared_from_this(); } void Release() noexcept { m_self.reset(); } + void ForceClosed() noexcept { m_closed = true; } static void AllocBuf(uv_handle_t* handle, size_t size, uv_buf_t* buf); static void DefaultFreeBuf(Buffer& buf);