wpiutil: uv::Async: Keep weak reference to loop

Other handles can only be used within the loop itself, but Async is intended
to be used from another thread.  This introduces the possibility of a race
condition between the loop being destroyed and the Async being destroyed.
Change Async to keep a weak reference to a loop and check it before performing
libuv operations.
This commit is contained in:
Peter Johnson
2018-09-16 15:36:19 -07:00
parent 1a7a0db1ff
commit 172e438cd6
4 changed files with 100 additions and 66 deletions

View File

@@ -40,9 +40,15 @@ class AsyncFunction<R(T...)> final
struct private_init {};
public:
AsyncFunction(std::function<void(promise<R>, T...)> func, const private_init&)
: wakeup{func} {}
~AsyncFunction() noexcept override = default;
AsyncFunction(const std::shared_ptr<Loop>& loop,
std::function<void(promise<R>, T...)> func, const private_init&)
: wakeup{func}, m_loop{loop} {}
~AsyncFunction() noexcept override {
if (auto loop = m_loop.lock())
this->Close();
else
this->ForceClosed();
}
/**
* Create an async handle.
@@ -55,33 +61,7 @@ class AsyncFunction<R(T...)> final
*/
static std::shared_ptr<AsyncFunction> Create(
Loop& loop, std::function<void(promise<R>, T...)> func = nullptr) {
auto h = std::make_shared<AsyncFunction>(std::move(func), private_init{});
int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
auto& h = *static_cast<AsyncFunction*>(handle->data);
std::unique_lock<wpi::mutex> lock(h.m_mutex);
if (!h.m_params.empty()) {
// for each set of parameters in the input queue, call the wakeup
// function and put the result in the output queue if the caller is
// waiting for it
for (auto&& v : h.m_params) {
auto p = h.m_promises.CreatePromise(v.first);
if (h.wakeup)
apply_tuple(h.wakeup, std::tuple_cat(std::make_tuple(std::move(p)),
std::move(v.second)));
}
h.m_params.clear();
// wake up any threads that might be waiting for the result
lock.unlock();
h.m_promises.Notify();
}
});
if (err < 0) {
loop.ReportError(err);
return nullptr;
}
h->Keep();
return h;
return Create(loop.shared_from_this(), std::move(func));
}
/**
@@ -96,7 +76,36 @@ class AsyncFunction<R(T...)> final
static std::shared_ptr<AsyncFunction> Create(
const std::shared_ptr<Loop>& loop,
std::function<void(promise<R>, T...)> func = nullptr) {
return Create(*loop, std::move(func));
auto h =
std::make_shared<AsyncFunction>(loop, std::move(func), private_init{});
int err =
uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
auto& h = *static_cast<AsyncFunction*>(handle->data);
std::unique_lock<wpi::mutex> lock(h.m_mutex);
if (!h.m_params.empty()) {
// for each set of parameters in the input queue, call the wakeup
// function and put the result in the output queue if the caller is
// waiting for it
for (auto&& v : h.m_params) {
auto p = h.m_promises.CreatePromise(v.first);
if (h.wakeup)
apply_tuple(h.wakeup,
std::tuple_cat(std::make_tuple(std::move(p)),
std::move(v.second)));
}
h.m_params.clear();
// wake up any threads that might be waiting for the result
lock.unlock();
h.m_promises.Notify();
}
});
if (err < 0) {
loop->ReportError(err);
return nullptr;
}
h->Keep();
return h;
}
/**
@@ -123,7 +132,7 @@ class AsyncFunction<R(T...)> final
}
// signal the loop
this->Invoke(&uv_async_send, this->GetRaw());
if (auto loop = m_loop.lock()) this->Invoke(&uv_async_send, this->GetRaw());
// return future
return m_promises.CreateFuture(req);
@@ -143,6 +152,7 @@ class AsyncFunction<R(T...)> final
wpi::mutex m_mutex;
std::vector<std::pair<uint64_t, std::tuple<T...>>> m_params;
PromiseFactory<R> m_promises;
std::weak_ptr<Loop> m_loop;
};
} // namespace uv