diff --git a/wpiutil/src/main/native/cpp/EventLoopRunner.cpp b/wpiutil/src/main/native/cpp/EventLoopRunner.cpp index 1273590bde..3b357879df 100644 --- a/wpiutil/src/main/native/cpp/EventLoopRunner.cpp +++ b/wpiutil/src/main/native/cpp/EventLoopRunner.cpp @@ -25,7 +25,10 @@ class EventLoopRunner::Thread : public SafeThread { // run function m_doExec = UvExecFunc::Create( - m_loop, [loop = m_loop.get()](LoopFunc func) { func(*loop); }); + m_loop, [loop = m_loop.get()](auto out, LoopFunc func) { + func(*loop); + out.set_value(); + }); } void Main() { @@ -51,7 +54,7 @@ EventLoopRunner::~EventLoopRunner() { void EventLoopRunner::ExecAsync(LoopFunc func) { if (auto thr = m_owner.GetThread()) { if (auto doExec = thr->m_doExec.lock()) { - doExec->Send(func); + doExec->Call(func); } } } diff --git a/wpiutil/src/main/native/cpp/future.cpp b/wpiutil/src/main/native/cpp/future.cpp new file mode 100644 index 0000000000..fb3165848e --- /dev/null +++ b/wpiutil/src/main/native/cpp/future.cpp @@ -0,0 +1,119 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) 2018 FIRST. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "wpi/future.h" + +namespace wpi { +namespace detail { + +PromiseFactoryBase::~PromiseFactoryBase() { + m_active = false; + m_resultCv.notify_all(); // wake up any waiters +} + +void PromiseFactoryBase::IgnoreResult(uint64_t request) { + std::unique_lock lock(m_resultMutex); + EraseRequest(request); +} + +uint64_t PromiseFactoryBase::CreateRequest() { + std::unique_lock lock(m_resultMutex); + uint64_t req = ++m_uid; + m_requests.push_back(req); + return req; +} + +bool PromiseFactoryBase::EraseRequest(uint64_t request) { + if (request == 0) return false; + auto it = std::find_if(m_requests.begin(), m_requests.end(), + [=](auto r) { return r == request; }); + if (it == m_requests.end()) return false; // no waiters + m_requests.erase(it); + return true; +} + +} // namespace detail + +future PromiseFactory::MakeReadyFuture() { + std::unique_lock lock(GetResultMutex()); + uint64_t req = CreateErasedRequest(); + m_results.emplace_back(req); + return future{this, req}; +} + +void PromiseFactory::SetValue(uint64_t request) { + std::unique_lock lock(GetResultMutex()); + if (!EraseRequest(request)) return; + auto it = std::find_if(m_thens.begin(), m_thens.end(), + [=](const auto& x) { return x.request == request; }); + if (it != m_thens.end()) { + uint64_t outRequest = it->outRequest; + ThenFunction func = std::move(it->func); + m_thens.erase(it); + lock.unlock(); + return func(outRequest); + } + m_results.emplace_back(request); + Notify(); +} + +void PromiseFactory::SetThen(uint64_t request, uint64_t outRequest, + ThenFunction func) { + std::unique_lock lock(GetResultMutex()); + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r == request; }); + if (it != m_results.end()) { + m_results.erase(it); + lock.unlock(); + return func(outRequest); + } + m_thens.emplace_back(request, outRequest, func); +} + +bool PromiseFactory::IsReady(uint64_t request) noexcept { + std::unique_lock lock(GetResultMutex()); + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r == request; }); + return it != m_results.end(); +} + +void PromiseFactory::GetResult(uint64_t request) { + // wait for response + std::unique_lock lock(GetResultMutex()); + while (IsActive()) { + // Did we get a response to *our* request? + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r == request; }); + if (it != m_results.end()) { + // Yes, remove it from the vector and we're done. + m_results.erase(it); + return; + } + // No, keep waiting for a response + Wait(lock); + } +} + +void PromiseFactory::WaitResult(uint64_t request) { + // wait for response + std::unique_lock lock(GetResultMutex()); + while (IsActive()) { + // Did we get a response to *our* request? + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r == request; }); + if (it != m_results.end()) return; + // No, keep waiting for a response + Wait(lock); + } +} + +PromiseFactory& PromiseFactory::GetInstance() { + static PromiseFactory inst; + return inst; +} + +} // namespace wpi diff --git a/wpiutil/src/main/native/include/wpi/future.h b/wpiutil/src/main/native/include/wpi/future.h new file mode 100644 index 0000000000..04e7e4564c --- /dev/null +++ b/wpiutil/src/main/native/include/wpi/future.h @@ -0,0 +1,907 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) 2018 FIRST. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#ifndef WPIUTIL_WPI_FUTURE_H_ +#define WPIUTIL_WPI_FUTURE_H_ + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "wpi/condition_variable.h" +#include "wpi/mutex.h" + +namespace wpi { + +template +class PromiseFactory; + +template +class future; + +template +class promise; +template <> +class promise; + +namespace detail { + +class PromiseFactoryBase { + public: + ~PromiseFactoryBase(); + + bool IsActive() const { return m_active; } + + wpi::mutex& GetResultMutex() { return m_resultMutex; } + + void Notify() { m_resultCv.notify_all(); } + + // must be called with locked lock == ResultMutex + void Wait(std::unique_lock& lock) { m_resultCv.wait(lock); } + + // returns false if timeout reached + template + bool WaitUntil(std::unique_lock& lock, + const std::chrono::time_point& timeout_time) { + return m_resultCv.wait_until(lock, timeout_time) == + std::cv_status::no_timeout; + } + + void IgnoreResult(uint64_t request); + + uint64_t CreateRequest(); + + // returns true if request was pending + // must be called with ResultMutex held + bool EraseRequest(uint64_t request); + + // same as doing CreateRequest() followed by EraseRequest() + // must be called with ResultMutex held + uint64_t CreateErasedRequest() { return ++m_uid; } + + private: + wpi::mutex m_resultMutex; + std::atomic_bool m_active{true}; + wpi::condition_variable m_resultCv; + + uint64_t m_uid = 0; + std::vector m_requests; +}; + +template +struct FutureThen { + template + static future Create(PromiseFactory& fromFactory, uint64_t request, + PromiseFactory& factory, F&& func); +}; + +template +struct FutureThen { + template + static future Create(PromiseFactory& fromFactory, + uint64_t request, PromiseFactory& factory, + F&& func); +}; + +template +struct FutureThen { + template + static future Create(PromiseFactory& fromFactory, uint64_t request, + PromiseFactory& factory, F&& func); +}; + +template <> +struct FutureThen { + template + static future Create(PromiseFactory& fromFactory, + uint64_t request, PromiseFactory& factory, + F&& func); +}; + +} // namespace detail + +/** + * A promise factory for lightweight futures. + * + * The lifetime of the factory must be ensured to be longer than any futures + * it creates. + * + * Use CreateRequest() to create the future request id, and then CreateFuture() + * and CreatePromise() to create future and promise objects. A promise should + * only be created once for any given request id. + * + * @tparam T the "return" type of the promise/future + */ +template +class PromiseFactory final : public detail::PromiseFactoryBase { + friend class future; + + public: + using detail::PromiseFactoryBase::Notify; + using ThenFunction = std::function; + + /** + * Creates a future. + * + * @param request the request id returned by CreateRequest() + * @return the future + */ + future CreateFuture(uint64_t request); + + /** + * Creates a future and makes it immediately ready. + * + * @return the future + */ + future MakeReadyFuture(T&& value); + + /** + * Creates a promise. + * + * @param request the request id returned by CreateRequest() + * @return the promise + */ + promise CreatePromise(uint64_t request); + + /** + * Sets a value directly for a future without creating a promise object. + * Identical to `CreatePromise(request).set_value(value)`. + * + * @param request request id, as returned by CreateRequest() + * @param value lvalue + */ + void SetValue(uint64_t request, const T& value); + + /** + * Sets a value directly for a future without creating a promise object. + * Identical to `CreatePromise(request).set_value(value)`. + * + * @param request request id, as returned by CreateRequest() + * @param value rvalue + */ + void SetValue(uint64_t request, T&& value); + + void SetThen(uint64_t request, uint64_t outRequest, ThenFunction func); + + bool IsReady(uint64_t request) noexcept; + T GetResult(uint64_t request); + void WaitResult(uint64_t request); + template + bool WaitResultUntil( + uint64_t request, + const std::chrono::time_point& timeout_time); + + static PromiseFactory& GetInstance(); + + private: + struct Then { + Then(uint64_t request_, uint64_t outRequest_, ThenFunction func_) + : request(request_), outRequest(outRequest_), func(std::move(func_)) {} + uint64_t request; + uint64_t outRequest; + ThenFunction func; + }; + + std::vector m_thens; + std::vector> m_results; +}; + +/** + * Explicit specialization for PromiseFactory. + */ +template <> +class PromiseFactory final : public detail::PromiseFactoryBase { + friend class future; + + public: + using detail::PromiseFactoryBase::Notify; + using ThenFunction = std::function; + + /** + * Creates a future. + * + * @param request the request id returned by CreateRequest() + * @return std::pair of the future and the request id + */ + future CreateFuture(uint64_t request); + + /** + * Creates a future and makes it immediately ready. + * + * @return the future + */ + future MakeReadyFuture(); + + /** + * Creates a promise. + * + * @param request the request id returned by CreateRequest() + * @return the promise + */ + promise CreatePromise(uint64_t request); + + /** + * Sets a value directly for a future without creating a promise object. + * Identical to `promise(factory, request).set_value()`. + * + * @param request request id, as returned by CreateRequest() + */ + void SetValue(uint64_t request); + + void SetThen(uint64_t request, uint64_t outRequest, ThenFunction func); + + bool IsReady(uint64_t request) noexcept; + void GetResult(uint64_t request); + void WaitResult(uint64_t request); + template + bool WaitResultUntil( + uint64_t request, + const std::chrono::time_point& timeout_time); + + static PromiseFactory& GetInstance(); + + private: + struct Then { + Then(uint64_t request_, uint64_t outRequest_, ThenFunction func_) + : request(request_), outRequest(outRequest_), func(std::move(func_)) {} + uint64_t request; + uint64_t outRequest; + ThenFunction func; + }; + + std::vector m_thens; + std::vector m_results; +}; + +/** + * A lightweight version of std::future. + * + * Use either promise::get_future() or PromiseFactory::CreateFuture() to create. + * + * @tparam T the "return" type + */ +template +class future final { + friend class PromiseFactory; + friend class promise; + + public: + /** + * Constructs an empty (invalid) future. + */ + future() noexcept = default; + + future(future&& oth) noexcept { + this->m_request = oth.m_request; + this->m_promises = oth.m_promises; + oth.m_request = 0; + oth.m_promises = nullptr; + } + future(const future&) = delete; + + template + future(future&& oth) noexcept + : future(oth.then([](R&& val) -> T { return val; })) {} + + /** + * Ignores the result of the future if it has not been retrieved. + */ + ~future() { + if (m_promises) m_promises->IgnoreResult(m_request); + } + + future& operator=(future&& oth) noexcept { + this->m_request = oth.m_request; + this->m_promises = oth.m_promises; + oth.m_request = 0; + oth.m_promises = nullptr; + return *this; + } + future& operator=(const future&) = delete; + + /** + * Gets the value. Calls wait() if the value is not yet available. + * Can only be called once. The future will be marked invalid after the call. + * + * @return The value provided by the corresponding promise.set_value(). + */ + T get() { + if (m_promises) + return m_promises->GetResult(m_request); + else + return T(); + } + + template + future then(PromiseFactory& factory, F&& func) { + if (m_promises) { + auto promises = m_promises; + m_promises = nullptr; + return detail::FutureThen::Create(*promises, m_request, factory, + func); + } else { + return future(); + } + } + + template ::type> + future then(F&& func) { + return then(PromiseFactory::GetInstance(), std::forward(func)); + } + + bool is_ready() const noexcept { + return m_promises && m_promises->IsReady(m_request); + } + + /** + * Checks if the future is valid. + * A default-constructed future or one where get() has been called is invalid. + * + * @return True if valid + */ + bool valid() const noexcept { return m_promises; } + + /** + * Waits for the promise to provide a value. + * Does not return until the value is available or the promise is destroyed + * (in which case a default-constructed value is "returned"). + * If the value has already been provided, returns immediately. + */ + void wait() const { + if (m_promises) m_promises->WaitResult(m_request); + } + + /** + * Waits for the promise to provide a value, or the specified time has been + * reached. + * + * @return True if the promise provided a value, false if timed out. + */ + template + bool wait_until( + const std::chrono::time_point& timeout_time) const { + return m_promises && m_promises->WaitResultUntil(m_request, timeout_time); + } + + /** + * Waits for the promise to provide a value, or the specified amount of time + * has elapsed. + * + * @return True if the promise provided a value, false if timed out. + */ + template + bool wait_for( + const std::chrono::duration& timeout_duration) const { + return wait_until(std::chrono::steady_clock::now() + timeout_duration); + } + + private: + future(PromiseFactory* promises, uint64_t request) noexcept + : m_request(request), m_promises(promises) {} + + uint64_t m_request = 0; + PromiseFactory* m_promises = nullptr; +}; + +/** + * Explicit specialization for future. + */ +template <> +class future final { + friend class PromiseFactory; + friend class promise; + + public: + /** + * Constructs an empty (invalid) future. + */ + future() noexcept = default; + + future(future&& oth) noexcept { + m_request = oth.m_request; + m_promises = oth.m_promises; + oth.m_request = 0; + oth.m_promises = nullptr; + } + future(const future&) = delete; + + /** + * Ignores the result of the future if it has not been retrieved. + */ + ~future() { + if (m_promises) m_promises->IgnoreResult(m_request); + } + + future& operator=(future&& oth) noexcept { + m_request = oth.m_request; + m_promises = oth.m_promises; + oth.m_request = 0; + oth.m_promises = nullptr; + return *this; + } + future& operator=(const future&) = delete; + + /** + * Gets the value. Calls wait() if the value is not yet available. + * Can only be called once. The future will be marked invalid after the call. + */ + void get() { + if (m_promises) m_promises->GetResult(m_request); + } + + template + future then(PromiseFactory& factory, F&& func) { + if (m_promises) { + auto promises = m_promises; + m_promises = nullptr; + return detail::FutureThen::Create(*promises, m_request, factory, + func); + } else { + return future(); + } + } + + template ::type> + future then(F&& func) { + return then(PromiseFactory::GetInstance(), std::forward(func)); + } + + bool is_ready() const noexcept { + return m_promises && m_promises->IsReady(m_request); + } + + /** + * Checks if the future is valid. + * A default-constructed future or one where get() has been called is invalid. + * + * @return True if valid + */ + bool valid() const noexcept { return m_promises; } + + /** + * Waits for the promise to provide a value. + * Does not return until the value is available or the promise is destroyed + * If the value has already been provided, returns immediately. + */ + void wait() const { + if (m_promises) m_promises->WaitResult(m_request); + } + + /** + * Waits for the promise to provide a value, or the specified time has been + * reached. + * + * @return True if the promise provided a value, false if timed out. + */ + template + bool wait_until( + const std::chrono::time_point& timeout_time) const { + return m_promises && m_promises->WaitResultUntil(m_request, timeout_time); + } + + /** + * Waits for the promise to provide a value, or the specified amount of time + * has elapsed. + * + * @return True if the promise provided a value, false if timed out. + */ + template + bool wait_for( + const std::chrono::duration& timeout_duration) const { + return wait_until(std::chrono::steady_clock::now() + timeout_duration); + } + + private: + future(PromiseFactory* promises, uint64_t request) noexcept + : m_request(request), m_promises(promises) {} + + uint64_t m_request = 0; + PromiseFactory* m_promises = nullptr; +}; + +/** + * A lightweight version of std::promise. + * + * Use PromiseFactory::CreatePromise() to create. + * + * @tparam T the "return" type + */ +template +class promise final { + friend class PromiseFactory; + + public: + /** + * Constructs an empty promise. + */ + promise() : m_promises(&PromiseFactory::GetInstance()) { + m_request = m_promises->CreateRequest(); + } + + promise(promise&& oth) noexcept + : m_request(oth.m_request), m_promises(oth.m_promises) { + oth.m_request = 0; + oth.m_promises = nullptr; + } + + promise(const promise&) = delete; + + /** + * Sets the promised value to a default-constructed T if not already set. + */ + ~promise() { + if (m_promises) m_promises->SetValue(m_request, T()); + } + + promise& operator=(promise&& oth) noexcept { + m_request = oth.m_request; + m_promises = oth.m_promises; + oth.m_request = 0; + oth.m_promises = nullptr; + return *this; + } + + promise& operator=(const promise&) = delete; + + /** + * Swaps this promise with another one. + */ + void swap(promise& oth) noexcept { + std::swap(m_request, oth.m_request); + std::swap(m_promises, oth.m_promises); + } + + /** + * Gets a future for this promise. + * + * @return The future + */ + future get_future() noexcept { return future(m_promises, m_request); } + + /** + * Sets the promised value. + * Only effective once (subsequent calls will be ignored). + * + * @param value The value to provide to the waiting future + */ + void set_value(const T& value) { + if (m_promises) m_promises->SetValue(m_request, value); + m_promises = nullptr; + } + + /** + * Sets the promised value. + * Only effective once (subsequent calls will be ignored). + * + * @param value The value to provide to the waiting future + */ + void set_value(T&& value) { + if (m_promises) m_promises->SetValue(m_request, std::move(value)); + m_promises = nullptr; + } + + private: + promise(PromiseFactory* promises, uint64_t request) noexcept + : m_request(request), m_promises(promises) {} + + uint64_t m_request = 0; + PromiseFactory* m_promises = nullptr; +}; + +/** + * Explicit specialization for promise. + */ +template <> +class promise final { + friend class PromiseFactory; + + public: + /** + * Constructs an empty promise. + */ + promise() : m_promises(&PromiseFactory::GetInstance()) { + m_request = m_promises->CreateRequest(); + } + + promise(promise&& oth) noexcept + : m_request(oth.m_request), m_promises(oth.m_promises) { + oth.m_request = 0; + oth.m_promises = nullptr; + } + + promise(const promise&) = delete; + + /** + * Sets the promised value if not already set. + */ + ~promise() { + if (m_promises) m_promises->SetValue(m_request); + } + + promise& operator=(promise&& oth) noexcept { + m_request = oth.m_request; + m_promises = oth.m_promises; + oth.m_request = 0; + oth.m_promises = nullptr; + return *this; + } + + promise& operator=(const promise&) = delete; + + /** + * Swaps this promise with another one. + */ + void swap(promise& oth) noexcept { + std::swap(m_request, oth.m_request); + std::swap(m_promises, oth.m_promises); + } + + /** + * Gets a future for this promise. + * + * @return The future + */ + future get_future() noexcept { + return future(m_promises, m_request); + } + + /** + * Sets the promised value. + * Only effective once (subsequent calls will be ignored). + */ + void set_value() { + if (m_promises) m_promises->SetValue(m_request); + m_promises = nullptr; + } + + private: + promise(PromiseFactory* promises, uint64_t request) noexcept + : m_request(request), m_promises(promises) {} + + uint64_t m_request = 0; + PromiseFactory* m_promises = nullptr; +}; + +/** + * Constructs a valid future with the value set. + */ +template +inline future make_ready_future(T&& value) { + return PromiseFactory::GetInstance().MakeReadyFuture( + std::forward(value)); +} + +/** + * Constructs a valid future with the value set. + */ +inline future make_ready_future() { + return PromiseFactory::GetInstance().MakeReadyFuture(); +} + +template +inline future PromiseFactory::CreateFuture(uint64_t request) { + return future{this, request}; +} + +template +future PromiseFactory::MakeReadyFuture(T&& value) { + std::unique_lock lock(GetResultMutex()); + uint64_t req = CreateErasedRequest(); + m_results.emplace_back(std::piecewise_construct, std::forward_as_tuple(req), + std::forward_as_tuple(std::move(value))); + return future{this, req}; +} + +template +inline promise PromiseFactory::CreatePromise(uint64_t request) { + return promise{this, request}; +} + +template +void PromiseFactory::SetValue(uint64_t request, const T& value) { + std::unique_lock lock(GetResultMutex()); + if (!EraseRequest(request)) return; + auto it = std::find_if(m_thens.begin(), m_thens.end(), + [=](const auto& x) { return x.request == request; }); + if (it != m_thens.end()) { + uint64_t outRequest = it->outRequest; + ThenFunction func = std::move(it->func); + m_thens.erase(it); + lock.unlock(); + return func(outRequest, value); + } + m_results.emplace_back(std::piecewise_construct, + std::forward_as_tuple(request), + std::forward_as_tuple(value)); + Notify(); +} + +template +void PromiseFactory::SetValue(uint64_t request, T&& value) { + std::unique_lock lock(GetResultMutex()); + if (!EraseRequest(request)) return; + auto it = std::find_if(m_thens.begin(), m_thens.end(), + [=](const auto& x) { return x.request == request; }); + if (it != m_thens.end()) { + uint64_t outRequest = it->outRequest; + ThenFunction func = std::move(it->func); + m_thens.erase(it); + lock.unlock(); + return func(outRequest, std::move(value)); + } + m_results.emplace_back(std::piecewise_construct, + std::forward_as_tuple(request), + std::forward_as_tuple(std::move(value))); + Notify(); +} + +template +void PromiseFactory::SetThen(uint64_t request, uint64_t outRequest, + ThenFunction func) { + std::unique_lock lock(GetResultMutex()); + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r.first == request; }); + if (it != m_results.end()) { + auto val = std::move(it->second); + m_results.erase(it); + lock.unlock(); + return func(outRequest, std::move(val)); + } + m_thens.emplace_back(request, outRequest, func); +} + +template +bool PromiseFactory::IsReady(uint64_t request) noexcept { + std::unique_lock lock(GetResultMutex()); + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r.first == request; }); + return it != m_results.end(); +} + +template +T PromiseFactory::GetResult(uint64_t request) { + // wait for response + std::unique_lock lock(GetResultMutex()); + while (IsActive()) { + // Did we get a response to *our* request? + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r.first == request; }); + if (it != m_results.end()) { + // Yes, remove it from the vector and we're done. + auto rv = std::move(it->second); + m_results.erase(it); + return rv; + } + // No, keep waiting for a response + Wait(lock); + } + return T(); +} + +template +void PromiseFactory::WaitResult(uint64_t request) { + // wait for response + std::unique_lock lock(GetResultMutex()); + while (IsActive()) { + // Did we get a response to *our* request? + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r.first == request; }); + if (it != m_results.end()) return; + // No, keep waiting for a response + Wait(lock); + } +} + +template +template +bool PromiseFactory::WaitResultUntil( + uint64_t request, + const std::chrono::time_point& timeout_time) { + std::unique_lock lock(GetResultMutex()); + bool timeout = false; + while (IsActive()) { + // Did we get a response to *our* request? + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r.first == request; }); + if (it != m_results.end()) return true; + if (timeout) break; + // No, keep waiting for a response + if (!WaitUntil(lock, timeout_time)) timeout = true; + } + return false; +} + +template +PromiseFactory& PromiseFactory::GetInstance() { + static PromiseFactory inst; + return inst; +} + +inline future PromiseFactory::CreateFuture(uint64_t request) { + return future{this, request}; +} + +inline promise PromiseFactory::CreatePromise(uint64_t request) { + return promise{this, request}; +} + +template +bool PromiseFactory::WaitResultUntil( + uint64_t request, + const std::chrono::time_point& timeout_time) { + std::unique_lock lock(GetResultMutex()); + bool timeout = false; + while (IsActive()) { + // Did we get a response to *our* request? + auto it = std::find_if(m_results.begin(), m_results.end(), + [=](const auto& r) { return r == request; }); + if (it != m_results.end()) return true; + if (timeout) break; + // No, keep waiting for a response + if (!WaitUntil(lock, timeout_time)) timeout = true; + } + return false; +} + +template +template +future detail::FutureThen::Create( + PromiseFactory& fromFactory, uint64_t request, + PromiseFactory& factory, F&& func) { + uint64_t req = factory.CreateRequest(); + fromFactory.SetThen(request, req, [&factory, func](uint64_t r, From value) { + factory.SetValue(r, func(std::move(value))); + }); + return factory.CreateFuture(req); +} + +template +template +future detail::FutureThen::Create( + PromiseFactory& fromFactory, uint64_t request, + PromiseFactory& factory, F&& func) { + uint64_t req = factory.CreateRequest(); + fromFactory.SetThen(request, req, [&factory, func](uint64_t r, From value) { + func(std::move(value)); + factory.SetValue(r); + }); + return factory.CreateFuture(req); +} + +template +template +future detail::FutureThen::Create( + PromiseFactory& fromFactory, uint64_t request, + PromiseFactory& factory, F&& func) { + uint64_t req = factory.CreateRequest(); + fromFactory.SetThen(request, req, [&factory, func](uint64_t r) { + factory.SetValue(r, func()); + }); + return factory.CreateFuture(req); +} + +template +future detail::FutureThen::Create( + PromiseFactory& fromFactory, uint64_t request, + PromiseFactory& factory, F&& func) { + uint64_t req = factory.CreateRequest(); + fromFactory.SetThen(request, req, [&factory, func](uint64_t r) { + func(); + factory.SetValue(r); + }); + return factory.CreateFuture(req); +} + +} // namespace wpi + +#endif // WPIUTIL_WPI_FUTURE_H_ diff --git a/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h b/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h index ee3b85e4b1..1ebaf3024b 100644 --- a/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h +++ b/wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h @@ -8,18 +8,17 @@ #ifndef WPIUTIL_WPI_UV_ASYNCFUNCTION_H_ #define WPIUTIL_WPI_UV_ASYNCFUNCTION_H_ +#include #include -#include #include #include -#include #include #include #include #include "wpi/STLExtras.h" -#include "wpi/condition_variable.h" +#include "wpi/future.h" #include "wpi/mutex.h" #include "wpi/uv/Handle.h" #include "wpi/uv/Loop.h" @@ -27,85 +26,6 @@ namespace wpi { namespace uv { -namespace detail { - -class AsyncFunctionBase { - public: - virtual ~AsyncFunctionBase() { - m_active = false; - m_resultCv.notify_all(); // wake up any waiters - } - - protected: - wpi::mutex m_mutex; - std::atomic_bool m_active{true}; - wpi::condition_variable m_resultCv; -}; - -template -struct AsyncFunctionHelper : public AsyncFunctionBase { - inline void RunCall(const std::function& func, - std::pair>& v) { - m_results.emplace_back( - std::piecewise_construct, std::forward_as_tuple(v.first), - std::forward_as_tuple(apply_tuple(func, std::move(v.second)))); - } - - inline R GetCallResult(std::thread::id from) { - // wait for response - std::unique_lock lock(m_mutex); - while (m_active) { - // Did we get a response to *our* request? - auto it = std::find_if(m_results.begin(), m_results.end(), - [=](const auto& r) { return r.first == from; }); - if (it != m_results.end()) { - // Yes, remove it from the vector and we're done. - auto rv = std::move(it->second); - m_results.erase(it); - return rv; - } - // No, keep waiting for a response - m_resultCv.wait(lock); - } - - return R(); - } - - private: - std::vector> m_results; -}; - -// void return value partial specialization -template -struct AsyncFunctionHelper : public AsyncFunctionBase { - inline void RunCall(const std::function& func, - std::pair>& v) { - apply_tuple(func, std::move(v.second)); - m_results.emplace_back(v.first); - } - - inline void GetCallResult(std::thread::id from) { - // wait for response - std::unique_lock lock(m_mutex); - while (m_active) { - // Did we get a response to *our* request? - auto it = std::find(m_results.begin(), m_results.end(), from); - if (it != m_results.end()) { - // Yes, remove it from the vector and we're done. - m_results.erase(it); - return; - } - // No, keep waiting for a response - m_resultCv.wait(lock); - } - } - - private: - std::vector m_results; -}; - -} // namespace detail - template class AsyncFunction; @@ -116,12 +36,11 @@ class AsyncFunction; */ template class AsyncFunction final - : public HandleImpl, uv_async_t>, - private detail::AsyncFunctionHelper { + : public HandleImpl, uv_async_t> { struct private_init {}; public: - AsyncFunction(std::function func, const private_init&) + AsyncFunction(std::function, T...)> func, const private_init&) : wakeup{func} {} ~AsyncFunction() noexcept override = default; @@ -129,28 +48,32 @@ class AsyncFunction final * Create an async handle. * * @param loop Loop object where this handle runs. - * @param func wakeup function to be called (sets wakeup value) + * @param func wakeup function to be called (sets wakeup value); the function + * needs to return void, and its first parameter is the promise + * for the result. If no value is set on the promise by the + * wakeup function, a default-constructed value is "returned". */ static std::shared_ptr Create( - Loop& loop, std::function func = nullptr) { + Loop& loop, std::function, T...)> func = nullptr) { auto h = std::make_shared(std::move(func), private_init{}); int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) { auto& h = *static_cast(handle->data); - std::lock_guard lock(h.m_mutex); + std::unique_lock lock(h.m_mutex); if (!h.m_params.empty()) { - // for each set of parameters in the input queue + // for each set of parameters in the input queue, call the wakeup + // function and put the result in the output queue if the caller is + // waiting for it for (auto&& v : h.m_params) { - // call the wakeup function and put the result in the output queue - // if the caller is waiting for it - if (v.first == std::thread::id{}) - apply_tuple(h.wakeup, std::move(v.second)); - else - h.RunCall(h.wakeup, v); + auto p = h.m_promises.CreatePromise(v.first); + if (h.wakeup) + apply_tuple(h.wakeup, std::tuple_cat(std::make_tuple(std::move(p)), + std::move(v.second))); } h.m_params.clear(); // wake up any threads that might be waiting for the result - h.m_resultCv.notify_all(); + lock.unlock(); + h.m_promises.Notify(); } }); if (err < 0) { @@ -165,71 +88,61 @@ class AsyncFunction final * Create an async handle. * * @param loop Loop object where this handle runs. - * @param func wakeup function to be called (sets wakeup value) + * @param func wakeup function to be called (sets wakeup value); the function + * needs to return void, and its first parameter is the promise + * for the result. If no value is set on the promise by the + * wakeup function, a default-constructed value is "returned". */ static std::shared_ptr Create( const std::shared_ptr& loop, - std::function func = nullptr) { + std::function, T...)> func = nullptr) { return Create(*loop, std::move(func)); } /** - * Wakeup the event loop, call the async function, and ignore any result. - * This is non-blocking and does NOT wait until the async function returns. + * Wakeup the event loop, call the async function, and return a future for + * the result. * * It’s safe to call this function from any thread EXCEPT the loop thread. * The async function will be called on the loop thread. + * + * The future will return a default-constructed result if this handle is + * destroyed while waiting for a result. */ template - void Send(U&&... u) { + future Call(U&&... u) { + // create the future + uint64_t req = m_promises.CreateRequest(); + // add the parameters to the input queue { - std::lock_guard lock(this->m_mutex); + std::lock_guard lock(m_mutex); m_params.emplace_back(std::piecewise_construct, - std::forward_as_tuple(std::thread::id{}), + std::forward_as_tuple(req), std::forward_as_tuple(std::forward(u)...)); } // signal the loop this->Invoke(&uv_async_send, this->GetRaw()); + + // return future + return m_promises.CreateFuture(req); } - /** - * Wakeup the event loop, call the async function, and return the result. - * This blocks the calling thread until the async function returns. - * - * It’s safe to call this function from any thread EXCEPT the loop thread. - * The async function will be called on the loop thread. - * - * Returns a default-constructed result if this handle is destroyed while - * waiting for a result. - */ template - R Call(U&&... u) { - std::thread::id from = std::this_thread::get_id(); - - // add the parameters to the input queue - { - std::lock_guard lock(this->m_mutex); - m_params.emplace_back(std::piecewise_construct, - std::forward_as_tuple(from), - std::forward_as_tuple(std::forward(u)...)); - } - - // signal the loop - this->Invoke(&uv_async_send, this->GetRaw()); - - // wait for response - return this->GetCallResult(from); + future operator()(U&&... u) { + return Call(std::forward(u)...); } /** * Function called (on event loop thread) when the async is called. */ - std::function wakeup; + std::function, T...)> wakeup; private: - std::vector>> m_params; + wpi::mutex m_mutex; + std::vector>> m_params; + PromiseFactory m_promises; }; } // namespace uv diff --git a/wpiutil/src/test/native/cpp/future_test.cpp b/wpiutil/src/test/native/cpp/future_test.cpp new file mode 100644 index 0000000000..b9b79a84c4 --- /dev/null +++ b/wpiutil/src/test/native/cpp/future_test.cpp @@ -0,0 +1,84 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) 2018 FIRST. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "wpi/future.h" // NOLINT(build/include_order) + +#include "gtest/gtest.h" // NOLINT(build/include_order) + +#include + +namespace wpi { + +TEST(Future, Then) { + promise inPromise; + future outFuture = + inPromise.get_future().then([](bool v) { return v ? 5 : 6; }); + + inPromise.set_value(true); + ASSERT_EQ(outFuture.get(), 5); +} + +TEST(Future, ThenSame) { + promise inPromise; + future outFuture = + inPromise.get_future().then([](bool v) { return !v; }); + + inPromise.set_value(true); + ASSERT_EQ(outFuture.get(), false); +} + +TEST(Future, ThenFromVoid) { + promise inPromise; + future outFuture = inPromise.get_future().then([] { return 5; }); + + inPromise.set_value(); + ASSERT_EQ(outFuture.get(), 5); +} + +TEST(Future, ThenToVoid) { + promise inPromise; + future outFuture = inPromise.get_future().then([](bool v) {}); + + inPromise.set_value(true); + ASSERT_TRUE(outFuture.is_ready()); +} + +TEST(Future, ThenVoidVoid) { + promise inPromise; + future outFuture = inPromise.get_future().then([] {}); + + inPromise.set_value(); + ASSERT_TRUE(outFuture.is_ready()); +} + +TEST(Future, Implicit) { + promise inPromise; + future outFuture = inPromise.get_future(); + + inPromise.set_value(true); + ASSERT_EQ(outFuture.get(), 1); +} + +TEST(Future, MoveSame) { + promise inPromise; + future outFuture1 = inPromise.get_future(); + future outFuture(std::move(outFuture1)); + + inPromise.set_value(true); + ASSERT_EQ(outFuture.get(), true); +} + +TEST(Future, MoveVoid) { + promise inPromise; + future outFuture1 = inPromise.get_future(); + future outFuture(std::move(outFuture1)); + + inPromise.set_value(); + ASSERT_TRUE(outFuture.is_ready()); +} + +} // namespace wpi diff --git a/wpiutil/src/test/native/cpp/uv/UvAsyncFunctionTest.cpp b/wpiutil/src/test/native/cpp/uv/UvAsyncFunctionTest.cpp index 3e4f83d84a..132adeab45 100644 --- a/wpiutil/src/test/native/cpp/uv/UvAsyncFunctionTest.cpp +++ b/wpiutil/src/test/native/cpp/uv/UvAsyncFunctionTest.cpp @@ -34,21 +34,23 @@ TEST(UvAsyncFunction, Test) { prepare->prepare.connect([&] { if (prepare_cb_called++) return; theThread = std::thread([&] { - ASSERT_EQ(async->Call(0), 1); - ASSERT_EQ(async->Call(1), 2); + auto call0 = async->Call(0); + auto call1 = async->Call(1); + ASSERT_EQ(call0.get(), 1); + ASSERT_EQ(call1.get(), 2); }); }); prepare->Start(); async->error.connect([](Error) { FAIL(); }); async->closed.connect([&] { close_cb_called++; }); - async->wakeup = [&](int v) { + async->wakeup = [&](promise out, int v) { ++async_cb_called[v]; if (v == 1) { async->Close(); prepare->Close(); } - return v + 1; + out.set_value(v + 1); }; loop->Run(); @@ -72,15 +74,15 @@ TEST(UvAsyncFunction, Ref) { prepare->prepare.connect([&] { if (prepare_cb_called++) return; - theThread = std::thread([&] { ASSERT_EQ(async->Call(1, val), 2); }); + theThread = std::thread([&] { ASSERT_EQ(async->Call(1, val).get(), 2); }); }); prepare->Start(); - async->wakeup = [&](int v, int& r) { + async->wakeup = [&](promise out, int v, int& r) { r = v; async->Close(); prepare->Close(); - return v + 1; + out.set_value(v + 1); }; loop->Run(); @@ -104,17 +106,18 @@ TEST(UvAsyncFunction, Movable) { if (prepare_cb_called++) return; theThread = std::thread([&] { auto val = std::make_unique(1); - auto val2 = async->Call(std::move(val)); + auto val2 = async->Call(std::move(val)).get(); ASSERT_NE(val2, nullptr); ASSERT_EQ(*val2, 1); }); }); prepare->Start(); - async->wakeup = [&](std::unique_ptr v) { + async->wakeup = [&](promise> out, + std::unique_ptr v) { async->Close(); prepare->Close(); - return v; + out.set_value(std::move(v)); }; loop->Run(); @@ -122,7 +125,7 @@ TEST(UvAsyncFunction, Movable) { if (theThread.joinable()) theThread.join(); } -TEST(UvAsyncFunction, Send) { +TEST(UvAsyncFunction, CallIgnoreResult) { int prepare_cb_called = 0; std::thread theThread; @@ -134,14 +137,15 @@ TEST(UvAsyncFunction, Send) { prepare->prepare.connect([&] { if (prepare_cb_called++) return; - theThread = std::thread([&] { async->Send(std::make_unique(1)); }); + theThread = std::thread([&] { async->Call(std::make_unique(1)); }); }); prepare->Start(); - async->wakeup = [&](std::unique_ptr v) { + async->wakeup = [&](promise> out, + std::unique_ptr v) { async->Close(); prepare->Close(); - return v; + out.set_value(std::move(v)); }; loop->Run(); @@ -164,9 +168,68 @@ TEST(UvAsyncFunction, VoidCall) { }); prepare->Start(); - async->wakeup = [&]() { + async->wakeup = [&](promise out) { async->Close(); prepare->Close(); + out.set_value(); + }; + + loop->Run(); + + if (theThread.joinable()) theThread.join(); +} + +TEST(UvAsyncFunction, WaitFor) { + int prepare_cb_called = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = AsyncFunction::Create(loop); + auto prepare = Prepare::Create(loop); + + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { + ASSERT_FALSE(async->Call().wait_for(std::chrono::milliseconds(10))); + }); + }); + prepare->Start(); + + async->wakeup = [&](promise out) { + async->Close(); + prepare->Close(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + out.set_value(1); + }; + + loop->Run(); + + if (theThread.joinable()) theThread.join(); +} + +TEST(UvAsyncFunction, VoidWaitFor) { + int prepare_cb_called = 0; + + std::thread theThread; + + auto loop = Loop::Create(); + auto async = AsyncFunction::Create(loop); + auto prepare = Prepare::Create(loop); + + prepare->prepare.connect([&] { + if (prepare_cb_called++) return; + theThread = std::thread([&] { + ASSERT_FALSE(async->Call().wait_for(std::chrono::milliseconds(10))); + }); + }); + prepare->Start(); + + async->wakeup = [&](promise out) { + async->Close(); + prepare->Close(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + out.set_value(); }; loop->Run();