diff --git a/cscore/src/main/native/cpp/MjpegServerImpl.cpp b/cscore/src/main/native/cpp/MjpegServerImpl.cpp index 51606f3f8a..df239220fc 100644 --- a/cscore/src/main/native/cpp/MjpegServerImpl.cpp +++ b/cscore/src/main/native/cpp/MjpegServerImpl.cpp @@ -865,10 +865,7 @@ void MjpegServerImpl::ServerThreadMain() { } // Start it if not already started - { - auto thr = it->GetThread(); - if (!thr) it->Start(new ConnThread{GetName()}); - } + it->Start(GetName()); auto nstreams = std::count_if(m_connThreads.begin(), m_connThreads.end(), diff --git a/cscore/src/main/native/cpp/Notifier.cpp b/cscore/src/main/native/cpp/Notifier.cpp index 88fd1c37c8..0e69618fcc 100644 --- a/cscore/src/main/native/cpp/Notifier.cpp +++ b/cscore/src/main/native/cpp/Notifier.cpp @@ -94,10 +94,7 @@ Notifier::Notifier() { s_destroyed = false; } Notifier::~Notifier() { s_destroyed = true; } -void Notifier::Start() { - auto thr = m_owner.GetThread(); - if (!thr) m_owner.Start(new Thread(m_on_start, m_on_exit)); -} +void Notifier::Start() { m_owner.Start(m_on_start, m_on_exit); } void Notifier::Stop() { m_owner.Stop(); } diff --git a/cscore/src/main/native/cpp/Telemetry.cpp b/cscore/src/main/native/cpp/Telemetry.cpp index 63885e8538..cfb3f804c3 100644 --- a/cscore/src/main/native/cpp/Telemetry.cpp +++ b/cscore/src/main/native/cpp/Telemetry.cpp @@ -45,10 +45,7 @@ Telemetry::Telemetry() {} Telemetry::~Telemetry() {} -void Telemetry::Start() { - auto thr = m_owner.GetThread(); - if (!thr) m_owner.Start(new Thread); -} +void Telemetry::Start() { m_owner.Start(); } void Telemetry::Stop() { m_owner.Stop(); } diff --git a/ntcore/src/main/native/cpp/CallbackManager.h b/ntcore/src/main/native/cpp/CallbackManager.h index 9f08016250..0fd9617253 100644 --- a/ntcore/src/main/native/cpp/CallbackManager.h +++ b/ntcore/src/main/native/cpp/CallbackManager.h @@ -294,8 +294,7 @@ class CallbackManager { protected: template void DoStart(Args&&... args) { - auto thr = m_owner.GetThread(); - if (!thr) m_owner.Start(new Thread(std::forward(args)...)); + m_owner.Start(std::forward(args)...); } template diff --git a/ntcore/src/main/native/cpp/DsClient.cpp b/ntcore/src/main/native/cpp/DsClient.cpp index 9714b9e0a1..8b97be7f7b 100644 --- a/ntcore/src/main/native/cpp/DsClient.cpp +++ b/ntcore/src/main/native/cpp/DsClient.cpp @@ -36,7 +36,7 @@ DsClient::DsClient(Dispatcher& dispatcher, wpi::Logger& logger) void DsClient::Start(unsigned int port) { auto thr = m_owner.GetThread(); if (!thr) - m_owner.Start(new Thread(m_dispatcher, m_logger, port)); + m_owner.Start(m_dispatcher, m_logger, port); else thr->m_port = port; } diff --git a/wpiutil/src/main/native/cpp/EventLoopRunner.cpp b/wpiutil/src/main/native/cpp/EventLoopRunner.cpp index 5d90d531d0..1273590bde 100644 --- a/wpiutil/src/main/native/cpp/EventLoopRunner.cpp +++ b/wpiutil/src/main/native/cpp/EventLoopRunner.cpp @@ -39,7 +39,7 @@ class EventLoopRunner::Thread : public SafeThread { std::weak_ptr m_doExec; }; -EventLoopRunner::EventLoopRunner() { m_owner.Start(new Thread); } +EventLoopRunner::EventLoopRunner() { m_owner.Start(); } EventLoopRunner::~EventLoopRunner() { ExecAsync([](uv::Loop& loop) { diff --git a/wpiutil/src/main/native/cpp/SafeThread.cpp b/wpiutil/src/main/native/cpp/SafeThread.cpp new file mode 100644 index 0000000000..284e75d376 --- /dev/null +++ b/wpiutil/src/main/native/cpp/SafeThread.cpp @@ -0,0 +1,65 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) 2015-2018 FIRST. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "wpi/SafeThread.h" + +using namespace wpi; + +detail::SafeThreadProxyBase::SafeThreadProxyBase( + std::shared_ptr thr) + : m_thread(std::move(thr)) { + if (!m_thread) return; + m_lock = std::unique_lock(m_thread->m_mutex); + if (!m_thread->m_active) { + m_lock.unlock(); + m_thread = nullptr; + return; + } +} + +void detail::SafeThreadOwnerBase::Start(std::shared_ptr thr) { + std::lock_guard lock(m_mutex); + if (auto thr = m_thread.lock()) return; + std::thread stdThread([=] { thr->Main(); }); + m_thread = thr; + m_nativeHandle = stdThread.native_handle(); + stdThread.detach(); +} + +void detail::SafeThreadOwnerBase::Stop() { + std::lock_guard lock(m_mutex); + if (auto thr = m_thread.lock()) { + thr->m_active = false; + thr->m_cond.notify_one(); + } +} + +void detail::swap(SafeThreadOwnerBase& lhs, SafeThreadOwnerBase& rhs) noexcept { + using std::swap; + if (&lhs == &rhs) return; + std::lock(lhs.m_mutex, rhs.m_mutex); + std::lock_guard lock_lhs(lhs.m_mutex, std::adopt_lock); + std::lock_guard lock_rhs(rhs.m_mutex, std::adopt_lock); + std::swap(lhs.m_thread, rhs.m_thread); + std::swap(lhs.m_nativeHandle, rhs.m_nativeHandle); +} + +detail::SafeThreadOwnerBase::operator bool() const { + std::lock_guard lock(m_mutex); + return !m_thread.expired(); +} + +std::thread::native_handle_type +detail::SafeThreadOwnerBase::GetNativeThreadHandle() const { + std::lock_guard lock(m_mutex); + return m_nativeHandle; +} + +std::shared_ptr detail::SafeThreadOwnerBase::GetThread() const { + std::lock_guard lock(m_mutex); + return m_thread.lock(); +} diff --git a/wpiutil/src/main/native/include/wpi/SafeThread.h b/wpiutil/src/main/native/include/wpi/SafeThread.h index ff5d44c523..af6d90db89 100644 --- a/wpiutil/src/main/native/include/wpi/SafeThread.h +++ b/wpiutil/src/main/native/include/wpi/SafeThread.h @@ -9,7 +9,9 @@ #define WPIUTIL_WPI_SAFETHREAD_H_ #include +#include #include +#include #include "wpi/condition_variable.h" #include "wpi/mutex.h" @@ -23,7 +25,7 @@ class SafeThread { virtual ~SafeThread() = default; virtual void Main() = 0; - wpi::mutex m_mutex; + mutable wpi::mutex m_mutex; std::atomic_bool m_active; wpi::condition_variable m_cond; }; @@ -33,20 +35,12 @@ namespace detail { // Non-template proxy base class for common proxy code. class SafeThreadProxyBase { public: - explicit SafeThreadProxyBase(SafeThread* thr) : m_thread(thr) { - if (!m_thread) return; - m_lock = std::unique_lock(m_thread->m_mutex); - if (!m_thread->m_active) { - m_lock.unlock(); - m_thread = nullptr; - return; - } - } + explicit SafeThreadProxyBase(std::shared_ptr thr); explicit operator bool() const { return m_thread != nullptr; } std::unique_lock& GetLock() { return m_lock; } protected: - SafeThread* m_thread; + std::shared_ptr m_thread; std::unique_lock m_lock; }; @@ -55,9 +49,10 @@ class SafeThreadProxyBase { template class SafeThreadProxy : public SafeThreadProxyBase { public: - explicit SafeThreadProxy(SafeThread* thr) : SafeThreadProxyBase(thr) {} - T& operator*() const { return *static_cast(m_thread); } - T* operator->() const { return static_cast(m_thread); } + explicit SafeThreadProxy(std::shared_ptr thr) + : SafeThreadProxyBase(std::move(thr)) {} + T& operator*() const { return *static_cast(m_thread.get()); } + T* operator->() const { return static_cast(m_thread.get()); } }; // Non-template owner base class for common owner code. @@ -65,62 +60,47 @@ class SafeThreadOwnerBase { public: void Stop(); - SafeThreadOwnerBase() { m_thread = nullptr; } + SafeThreadOwnerBase() noexcept = default; SafeThreadOwnerBase(const SafeThreadOwnerBase&) = delete; SafeThreadOwnerBase& operator=(const SafeThreadOwnerBase&) = delete; - SafeThreadOwnerBase(SafeThreadOwnerBase&& other) - : m_thread(other.m_thread.exchange(nullptr)) {} - SafeThreadOwnerBase& operator=(SafeThreadOwnerBase other) { - SafeThread* otherthr = other.m_thread.exchange(nullptr); - SafeThread* curthr = m_thread.exchange(otherthr); - other.m_thread.exchange(curthr); // other destructor will clean up + SafeThreadOwnerBase(SafeThreadOwnerBase&& other) noexcept + : SafeThreadOwnerBase() { + swap(*this, other); + } + SafeThreadOwnerBase& operator=(SafeThreadOwnerBase other) noexcept { + swap(*this, other); return *this; } ~SafeThreadOwnerBase() { Stop(); } - explicit operator bool() const { return m_thread.load(); } + friend void swap(SafeThreadOwnerBase& lhs, SafeThreadOwnerBase& rhs) noexcept; + + explicit operator bool() const; + + std::thread::native_handle_type GetNativeThreadHandle() const; protected: - void Start(SafeThread* thr); - SafeThread* GetThread() const { return m_thread.load(); } - std::thread::native_handle_type GetNativeThreadHandle() const { - return m_nativeHandle; - } + void Start(std::shared_ptr thr); + std::shared_ptr GetThread() const; private: - std::atomic m_thread; - std::atomic m_nativeHandle; + mutable wpi::mutex m_mutex; + std::weak_ptr m_thread; + std::thread::native_handle_type m_nativeHandle; }; -inline void SafeThreadOwnerBase::Start(SafeThread* thr) { - SafeThread* curthr = nullptr; - SafeThread* newthr = thr; - if (!m_thread.compare_exchange_strong(curthr, newthr)) { - delete newthr; - return; - } - std::thread stdThread([=]() { - newthr->Main(); - delete newthr; - }); - m_nativeHandle = stdThread.native_handle(); - stdThread.detach(); -} - -inline void SafeThreadOwnerBase::Stop() { - SafeThread* thr = m_thread.exchange(nullptr); - if (!thr) return; - thr->m_active = false; - thr->m_cond.notify_one(); -} +void swap(SafeThreadOwnerBase& lhs, SafeThreadOwnerBase& rhs) noexcept; } // namespace detail template class SafeThreadOwner : public detail::SafeThreadOwnerBase { public: - void Start() { Start(new T); } - void Start(T* thr) { detail::SafeThreadOwnerBase::Start(thr); } + template + void Start(Args&&... args) { + detail::SafeThreadOwnerBase::Start( + std::make_shared(std::forward(args)...)); + } using Proxy = typename detail::SafeThreadProxy; Proxy GetThread() const {