From 80618a2e64cd93b49c4d13fefaa75117ec152b2f Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 12 Nov 2017 22:12:03 -0800 Subject: [PATCH] Use wpi mutex and condition_variable. (#52) --- .../cpp/tcpsockets/TCPConnector_parallel.cpp | 20 ++++++++-------- .../native/include/support/ConcurrentQueue.h | 23 ++++++++++--------- src/main/native/include/support/SafeThread.h | 13 ++++++----- src/main/native/include/support/jni_util.h | 4 ++-- 4 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/main/native/cpp/tcpsockets/TCPConnector_parallel.cpp b/src/main/native/cpp/tcpsockets/TCPConnector_parallel.cpp index c9b92924bc..dbbc3e968d 100644 --- a/src/main/native/cpp/tcpsockets/TCPConnector_parallel.cpp +++ b/src/main/native/cpp/tcpsockets/TCPConnector_parallel.cpp @@ -9,12 +9,12 @@ #include #include -#include -#include #include #include #include "llvm/SmallSet.h" +#include "support/condition_variable.h" +#include "support/mutex.h" using namespace wpi; @@ -33,7 +33,7 @@ std::unique_ptr TCPConnector::connect_parallel( // structure to make sure we don't start duplicate workers struct GlobalState { - std::mutex mutex; + wpi::mutex mtx; #ifdef HAVE_THREAD_LOCAL llvm::SmallSet, 16> active; #else @@ -50,8 +50,8 @@ std::unique_ptr TCPConnector::connect_parallel( // structure shared between threads and this function struct Result { - std::mutex mutex; - std::condition_variable cv; + wpi::mutex mtx; + wpi::condition_variable cv; std::unique_ptr stream; std::atomic count{0}; std::atomic done{false}; @@ -74,7 +74,7 @@ std::unique_ptr TCPConnector::connect_parallel( // don't start a new worker if we had a previously still-active connection // attempt to the same server { - std::lock_guard lock(local->mutex); + std::lock_guard lock(local->mtx); if (local->active.count(active_tracker) > 0) continue; // already in set } @@ -85,7 +85,7 @@ std::unique_ptr TCPConnector::connect_parallel( if (!result->done) { // add to global state { - std::lock_guard lock(local->mutex); + std::lock_guard lock(local->mtx); local->active.insert(active_tracker); } @@ -95,13 +95,13 @@ std::unique_ptr TCPConnector::connect_parallel( // remove from global state { - std::lock_guard lock(local->mutex); + std::lock_guard lock(local->mtx); local->active.erase(active_tracker); } // successful connection if (stream) { - std::lock_guard lock(result->mutex); + std::lock_guard lock(result->mtx); if (!result->done.exchange(true)) result->stream = std::move(stream); } } @@ -111,7 +111,7 @@ std::unique_ptr TCPConnector::connect_parallel( } // wait for a result, timeout, or all finished - std::unique_lock lock(result->mutex); + std::unique_lock lock(result->mtx); if (timeout == 0) { result->cv.wait( lock, [&] { return result->stream || result->count >= num_workers; }); diff --git a/src/main/native/include/support/ConcurrentQueue.h b/src/main/native/include/support/ConcurrentQueue.h index abe9580f7f..599274b708 100644 --- a/src/main/native/include/support/ConcurrentQueue.h +++ b/src/main/native/include/support/ConcurrentQueue.h @@ -9,8 +9,9 @@ #include #include -#include -#include + +#include "support/mutex.h" +#include "support/condition_variable.h" namespace wpi { @@ -18,17 +19,17 @@ template class ConcurrentQueue { public: bool empty() const { - std::unique_lock mlock(mutex_); + std::unique_lock mlock(mutex_); return queue_.empty(); } typename std::queue::size_type size() const { - std::unique_lock mlock(mutex_); + std::unique_lock mlock(mutex_); return queue_.size(); } T pop() { - std::unique_lock mlock(mutex_); + std::unique_lock mlock(mutex_); while (queue_.empty()) { cond_.wait(mlock); } @@ -38,7 +39,7 @@ class ConcurrentQueue { } void pop(T& item) { - std::unique_lock mlock(mutex_); + std::unique_lock mlock(mutex_); while (queue_.empty()) { cond_.wait(mlock); } @@ -47,14 +48,14 @@ class ConcurrentQueue { } void push(const T& item) { - std::unique_lock mlock(mutex_); + std::unique_lock mlock(mutex_); queue_.push(item); mlock.unlock(); cond_.notify_one(); } void push(T&& item) { - std::unique_lock mlock(mutex_); + std::unique_lock mlock(mutex_); queue_.push(std::forward(item)); mlock.unlock(); cond_.notify_one(); @@ -62,7 +63,7 @@ class ConcurrentQueue { template void emplace(Args&&... args) { - std::unique_lock mlock(mutex_); + std::unique_lock mlock(mutex_); queue_.emplace(std::forward(args)...); mlock.unlock(); cond_.notify_one(); @@ -74,8 +75,8 @@ class ConcurrentQueue { private: std::queue queue_; - mutable std::mutex mutex_; - std::condition_variable cond_; + mutable wpi::mutex mutex_; + wpi::condition_variable cond_; }; } // namespace wpi diff --git a/src/main/native/include/support/SafeThread.h b/src/main/native/include/support/SafeThread.h index cad4cdb3b5..dfde804595 100644 --- a/src/main/native/include/support/SafeThread.h +++ b/src/main/native/include/support/SafeThread.h @@ -9,10 +9,11 @@ #define WPIUTIL_SUPPORT_SAFETHREAD_H_ #include -#include -#include #include +#include "support/condition_variable.h" +#include "support/mutex.h" + namespace wpi { // Base class for SafeThreadOwner threads. @@ -22,9 +23,9 @@ class SafeThread { virtual ~SafeThread() = default; virtual void Main() = 0; - std::mutex m_mutex; + wpi::mutex m_mutex; std::atomic_bool m_active; - std::condition_variable m_cond; + wpi::condition_variable m_cond; }; namespace detail { @@ -42,11 +43,11 @@ class SafeThreadProxyBase { } } explicit operator bool() const { return m_thread != nullptr; } - std::unique_lock& GetLock() { return m_lock; } + std::unique_lock& GetLock() { return m_lock; } protected: SafeThread* m_thread; - std::unique_lock m_lock; + std::unique_lock m_lock; }; // A proxy for SafeThread. diff --git a/src/main/native/include/support/jni_util.h b/src/main/native/include/support/jni_util.h index 4bd569476a..867c60f7fb 100644 --- a/src/main/native/include/support/jni_util.h +++ b/src/main/native/include/support/jni_util.h @@ -8,7 +8,6 @@ #ifndef WPIUTIL_SUPPORT_JNI_UTIL_H_ #define WPIUTIL_SUPPORT_JNI_UTIL_H_ -#include #include #include #include @@ -24,6 +23,7 @@ #include "llvm/StringRef.h" #include "support/atomic_static.h" #include "support/deprecated.h" +#include "support/mutex.h" #include "support/SafeThread.h" namespace wpi { @@ -473,7 +473,7 @@ void JCallbackThread::Main() { jint rs = T::GetJVM()->AttachCurrentThreadAsDaemon((void**)&env, &args); if (rs != JNI_OK) return; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); while (m_active) { m_cond.wait(lock, [&] { return !(m_active && m_queue.empty()); }); if (!m_active) break;