wpiutil: Change uv::AsyncFunction to use promise/future.

This allows the called function to pass along the promise to another
asynchronous callback.

To avoid memory allocations, add a home-rolled, simplified, non-allocating
version of std::promise and std::future as wpi::promise and wpi::future.
This commit is contained in:
Peter Johnson
2018-09-05 23:01:57 -07:00
parent 11e5faf469
commit 1a7a0db1ff
6 changed files with 1237 additions and 148 deletions

View File

@@ -8,18 +8,17 @@
#ifndef WPIUTIL_WPI_UV_ASYNCFUNCTION_H_
#define WPIUTIL_WPI_UV_ASYNCFUNCTION_H_
#include <stdint.h>
#include <uv.h>
#include <algorithm>
#include <functional>
#include <memory>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
#include "wpi/STLExtras.h"
#include "wpi/condition_variable.h"
#include "wpi/future.h"
#include "wpi/mutex.h"
#include "wpi/uv/Handle.h"
#include "wpi/uv/Loop.h"
@@ -27,85 +26,6 @@
namespace wpi {
namespace uv {
namespace detail {
class AsyncFunctionBase {
public:
virtual ~AsyncFunctionBase() {
m_active = false;
m_resultCv.notify_all(); // wake up any waiters
}
protected:
wpi::mutex m_mutex;
std::atomic_bool m_active{true};
wpi::condition_variable m_resultCv;
};
template <typename R, typename... T>
struct AsyncFunctionHelper : public AsyncFunctionBase {
inline void RunCall(const std::function<R(T...)>& func,
std::pair<std::thread::id, std::tuple<T...>>& v) {
m_results.emplace_back(
std::piecewise_construct, std::forward_as_tuple(v.first),
std::forward_as_tuple(apply_tuple(func, std::move(v.second))));
}
inline R GetCallResult(std::thread::id from) {
// wait for response
std::unique_lock<wpi::mutex> lock(m_mutex);
while (m_active) {
// Did we get a response to *our* request?
auto it = std::find_if(m_results.begin(), m_results.end(),
[=](const auto& r) { return r.first == from; });
if (it != m_results.end()) {
// Yes, remove it from the vector and we're done.
auto rv = std::move(it->second);
m_results.erase(it);
return rv;
}
// No, keep waiting for a response
m_resultCv.wait(lock);
}
return R();
}
private:
std::vector<std::pair<std::thread::id, R>> m_results;
};
// void return value partial specialization
template <typename... T>
struct AsyncFunctionHelper<void, T...> : public AsyncFunctionBase {
inline void RunCall(const std::function<void(T...)>& func,
std::pair<std::thread::id, std::tuple<T...>>& v) {
apply_tuple(func, std::move(v.second));
m_results.emplace_back(v.first);
}
inline void GetCallResult(std::thread::id from) {
// wait for response
std::unique_lock<wpi::mutex> lock(m_mutex);
while (m_active) {
// Did we get a response to *our* request?
auto it = std::find(m_results.begin(), m_results.end(), from);
if (it != m_results.end()) {
// Yes, remove it from the vector and we're done.
m_results.erase(it);
return;
}
// No, keep waiting for a response
m_resultCv.wait(lock);
}
}
private:
std::vector<std::thread::id> m_results;
};
} // namespace detail
template <typename T>
class AsyncFunction;
@@ -116,12 +36,11 @@ class AsyncFunction;
*/
template <typename R, typename... T>
class AsyncFunction<R(T...)> final
: public HandleImpl<AsyncFunction<R(T...)>, uv_async_t>,
private detail::AsyncFunctionHelper<R, T...> {
: public HandleImpl<AsyncFunction<R(T...)>, uv_async_t> {
struct private_init {};
public:
AsyncFunction(std::function<R(T...)> func, const private_init&)
AsyncFunction(std::function<void(promise<R>, T...)> func, const private_init&)
: wakeup{func} {}
~AsyncFunction() noexcept override = default;
@@ -129,28 +48,32 @@ class AsyncFunction<R(T...)> final
* Create an async handle.
*
* @param loop Loop object where this handle runs.
* @param func wakeup function to be called (sets wakeup value)
* @param func wakeup function to be called (sets wakeup value); the function
* needs to return void, and its first parameter is the promise
* for the result. If no value is set on the promise by the
* wakeup function, a default-constructed value is "returned".
*/
static std::shared_ptr<AsyncFunction> Create(
Loop& loop, std::function<R(T...)> func = nullptr) {
Loop& loop, std::function<void(promise<R>, T...)> func = nullptr) {
auto h = std::make_shared<AsyncFunction>(std::move(func), private_init{});
int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
auto& h = *static_cast<AsyncFunction*>(handle->data);
std::lock_guard<wpi::mutex> lock(h.m_mutex);
std::unique_lock<wpi::mutex> lock(h.m_mutex);
if (!h.m_params.empty()) {
// for each set of parameters in the input queue
// for each set of parameters in the input queue, call the wakeup
// function and put the result in the output queue if the caller is
// waiting for it
for (auto&& v : h.m_params) {
// call the wakeup function and put the result in the output queue
// if the caller is waiting for it
if (v.first == std::thread::id{})
apply_tuple(h.wakeup, std::move(v.second));
else
h.RunCall(h.wakeup, v);
auto p = h.m_promises.CreatePromise(v.first);
if (h.wakeup)
apply_tuple(h.wakeup, std::tuple_cat(std::make_tuple(std::move(p)),
std::move(v.second)));
}
h.m_params.clear();
// wake up any threads that might be waiting for the result
h.m_resultCv.notify_all();
lock.unlock();
h.m_promises.Notify();
}
});
if (err < 0) {
@@ -165,71 +88,61 @@ class AsyncFunction<R(T...)> final
* Create an async handle.
*
* @param loop Loop object where this handle runs.
* @param func wakeup function to be called (sets wakeup value)
* @param func wakeup function to be called (sets wakeup value); the function
* needs to return void, and its first parameter is the promise
* for the result. If no value is set on the promise by the
* wakeup function, a default-constructed value is "returned".
*/
static std::shared_ptr<AsyncFunction> Create(
const std::shared_ptr<Loop>& loop,
std::function<R(T...)> func = nullptr) {
std::function<void(promise<R>, T...)> func = nullptr) {
return Create(*loop, std::move(func));
}
/**
* Wakeup the event loop, call the async function, and ignore any result.
* This is non-blocking and does NOT wait until the async function returns.
* Wakeup the event loop, call the async function, and return a future for
* the result.
*
* Its safe to call this function from any thread EXCEPT the loop thread.
* The async function will be called on the loop thread.
*
* The future will return a default-constructed result if this handle is
* destroyed while waiting for a result.
*/
template <typename... U>
void Send(U&&... u) {
future<R> Call(U&&... u) {
// create the future
uint64_t req = m_promises.CreateRequest();
// add the parameters to the input queue
{
std::lock_guard<wpi::mutex> lock(this->m_mutex);
std::lock_guard<wpi::mutex> lock(m_mutex);
m_params.emplace_back(std::piecewise_construct,
std::forward_as_tuple(std::thread::id{}),
std::forward_as_tuple(req),
std::forward_as_tuple(std::forward<U>(u)...));
}
// signal the loop
this->Invoke(&uv_async_send, this->GetRaw());
// return future
return m_promises.CreateFuture(req);
}
/**
* Wakeup the event loop, call the async function, and return the result.
* This blocks the calling thread until the async function returns.
*
* Its safe to call this function from any thread EXCEPT the loop thread.
* The async function will be called on the loop thread.
*
* Returns a default-constructed result if this handle is destroyed while
* waiting for a result.
*/
template <typename... U>
R Call(U&&... u) {
std::thread::id from = std::this_thread::get_id();
// add the parameters to the input queue
{
std::lock_guard<wpi::mutex> lock(this->m_mutex);
m_params.emplace_back(std::piecewise_construct,
std::forward_as_tuple(from),
std::forward_as_tuple(std::forward<U>(u)...));
}
// signal the loop
this->Invoke(&uv_async_send, this->GetRaw());
// wait for response
return this->GetCallResult(from);
future<R> operator()(U&&... u) {
return Call(std::forward<U>(u)...);
}
/**
* Function called (on event loop thread) when the async is called.
*/
std::function<R(T...)> wakeup;
std::function<void(promise<R>, T...)> wakeup;
private:
std::vector<std::pair<std::thread::id, std::tuple<T...>>> m_params;
wpi::mutex m_mutex;
std::vector<std::pair<uint64_t, std::tuple<T...>>> m_params;
PromiseFactory<R> m_promises;
};
} // namespace uv