diff --git a/src/HTTPSinkImpl.cpp b/src/HTTPSinkImpl.cpp index d9679ddeb1..d48b26dc33 100644 --- a/src/HTTPSinkImpl.cpp +++ b/src/HTTPSinkImpl.cpp @@ -290,6 +290,7 @@ void HTTPSinkImpl::SendStream(wpi::raw_socket_ostream& os) { DEBUG("Headers send, sending stream now"); + Enable(); while (m_active && !os.has_error()) { auto source = GetSource(); if (!source) { @@ -321,9 +322,9 @@ void HTTPSinkImpl::SendStream(wpi::raw_socket_ostream& os) { os << "\r\n--" BOUNDARY "\r\n"; // os.flush(); } + Disable(); } - // thread for clients that connected to this server void HTTPSinkImpl::ConnThreadMain(wpi::NetworkStream* stream) { wpi::raw_socket_istream is{*stream}; diff --git a/src/SinkImpl.cpp b/src/SinkImpl.cpp index f600392e06..a48feb1e47 100644 --- a/src/SinkImpl.cpp +++ b/src/SinkImpl.cpp @@ -12,3 +12,22 @@ using namespace cs; SinkImpl::SinkImpl(llvm::StringRef name) : m_name{name} {} + +SinkImpl::~SinkImpl() { + if (m_source) { + if (m_enabled) m_source->DisableSink(); + m_source->RemoveSink(); + } +} + +void SinkImpl::SetSource(std::shared_ptr source) { + std::lock_guard lock(m_mutex); + bool was_enabled = m_enabled; + if (m_source) { + if (m_enabled) m_source->DisableSink(); + m_source->RemoveSink(); + } + m_source = source; + m_source->AddSink(); + if (was_enabled) m_source->EnableSink(); +} diff --git a/src/SinkImpl.h b/src/SinkImpl.h index c6ecfa73f1..77dd34cfad 100644 --- a/src/SinkImpl.h +++ b/src/SinkImpl.h @@ -14,26 +14,40 @@ #include "llvm/StringRef.h" +#include "SourceImpl.h" + namespace cs { class Frame; -class SourceImpl; class SinkImpl { public: SinkImpl(llvm::StringRef name); - virtual ~SinkImpl() = default; + virtual ~SinkImpl(); SinkImpl(const SinkImpl& queue) = delete; SinkImpl& operator=(const SinkImpl& queue) = delete; llvm::StringRef GetName() const { return m_name; } virtual void GetDescription(llvm::SmallVectorImpl& desc) const = 0; - void SetSource(std::shared_ptr source) { + void Enable() { std::lock_guard lock(m_mutex); - m_source = source; + if (!m_enabled) { + m_enabled = true; + if (m_source) m_source->EnableSink(); + } } + void Disable() { + std::lock_guard lock(m_mutex); + if (m_enabled) { + m_enabled = false; + if (m_source) m_source->DisableSink(); + } + } + + void SetSource(std::shared_ptr source); + std::shared_ptr GetSource() const { std::lock_guard lock(m_mutex); return m_source; @@ -43,6 +57,7 @@ class SinkImpl { std::string m_name; mutable std::mutex m_mutex; std::shared_ptr m_source; + bool m_enabled{false}; }; } // namespace cs diff --git a/src/SourceImpl.cpp b/src/SourceImpl.cpp index 427cf2fe13..7b2ea503f6 100644 --- a/src/SourceImpl.cpp +++ b/src/SourceImpl.cpp @@ -17,6 +17,7 @@ SourceImpl::SourceImpl(llvm::StringRef name) SourceImpl::~SourceImpl() { // Wake up anyone who is waiting. This also clears the current frame, // which is good because its destructor will call back into the class. + EnableSink(); Wakeup(); // Everything else can clean up itself. } @@ -24,7 +25,7 @@ SourceImpl::~SourceImpl() { Frame SourceImpl::GetNextFrame() { std::unique_lock lock{m_frameMutex}; // TODO: handle spurious wakeup by comparing frame timestamps - m_cv.wait(lock); + m_frameCv.wait(lock); return m_frame; } @@ -33,7 +34,7 @@ void SourceImpl::Wakeup() { std::lock_guard lock{m_frameMutex}; m_frame = Frame{*this, nullptr}; } - m_cv.notify_all(); + m_frameCv.notify_all(); } void SourceImpl::StartFrame() { @@ -56,7 +57,7 @@ void SourceImpl::FinishFrame() { std::lock_guard lock2{m_frameMutex}; m_frame = Frame{*this, m_frameData.release()}; } - m_cv.notify_all(); + m_frameCv.notify_all(); } void SourceImpl::ReleaseFrame(Frame::Data* data) { diff --git a/src/SourceImpl.h b/src/SourceImpl.h index 9039a138d1..c44b582e75 100644 --- a/src/SourceImpl.h +++ b/src/SourceImpl.h @@ -35,6 +35,38 @@ class SourceImpl { int GetNumChannels() const { return m_numChannels; } bool IsConnected() const { return m_connected; } + // Functions to keep track of the overall number of sinks connected to this + // source. Primarily used by sinks to determine if other sinks are using + // the same source. + int GetNumSinks() const { return m_numSinks; } + void AddSink() { ++m_numSinks; } + void RemoveSink() { --m_numSinks; } + + // Functions to keep track of the number of sinks connected to this source + // that are "enabled", in other words, listening for new images. Primarily + // used by sources to determine whether they should actually bother trying + // to get source frames. + int GetNumSinksEnabled() const { + std::lock_guard lock{m_numSinksEnabledMutex}; + return m_numSinksEnabled; + } + + void EnableSink() { + std::lock_guard lock{m_numSinksEnabledMutex}; + ++m_numSinksEnabled; + m_numSinksEnabledCv.notify_all(); + } + + void DisableSink() { + std::lock_guard lock{m_numSinksEnabledMutex}; + --m_numSinksEnabled; + } + + void WaitForEnabledSink() { + std::unique_lock lock{m_numSinksEnabledMutex}; + m_numSinksEnabledCv.wait(lock, [this] { return m_numSinksEnabled > 0; }); + } + // Gets the current frame (without waiting for a new one). Frame GetCurFrame(); @@ -72,6 +104,7 @@ class SourceImpl { std::atomic_int m_numChannels{0}; std::atomic_bool m_connected{false}; + std::atomic_int m_numSinks{0}; private: void ReleaseFrame(Frame::Data* data); @@ -79,8 +112,13 @@ class SourceImpl { std::string m_name; std::mutex m_mutex; + + mutable std::mutex m_numSinksEnabledMutex; + std::condition_variable m_numSinksEnabledCv; + int m_numSinksEnabled; + std::mutex m_frameMutex; - std::condition_variable m_cv; + std::condition_variable m_frameCv; // Most recent complete frame (returned to callers of GetNextFrame) // Access protected by m_frameMutex.