Source: Keep track of how many sinks are connected and enabled.

This commit is contained in:
Peter Johnson
2016-09-08 23:52:23 -07:00
parent ddb97bfafb
commit ba241cd7f9
5 changed files with 83 additions and 9 deletions

View File

@@ -290,6 +290,7 @@ void HTTPSinkImpl::SendStream(wpi::raw_socket_ostream& os) {
DEBUG("Headers send, sending stream now");
Enable();
while (m_active && !os.has_error()) {
auto source = GetSource();
if (!source) {
@@ -321,9 +322,9 @@ void HTTPSinkImpl::SendStream(wpi::raw_socket_ostream& os) {
os << "\r\n--" BOUNDARY "\r\n";
// os.flush();
}
Disable();
}
// thread for clients that connected to this server
void HTTPSinkImpl::ConnThreadMain(wpi::NetworkStream* stream) {
wpi::raw_socket_istream is{*stream};

View File

@@ -12,3 +12,22 @@
using namespace cs;
SinkImpl::SinkImpl(llvm::StringRef name) : m_name{name} {}
SinkImpl::~SinkImpl() {
if (m_source) {
if (m_enabled) m_source->DisableSink();
m_source->RemoveSink();
}
}
void SinkImpl::SetSource(std::shared_ptr<SourceImpl> source) {
std::lock_guard<std::mutex> lock(m_mutex);
bool was_enabled = m_enabled;
if (m_source) {
if (m_enabled) m_source->DisableSink();
m_source->RemoveSink();
}
m_source = source;
m_source->AddSink();
if (was_enabled) m_source->EnableSink();
}

View File

@@ -14,26 +14,40 @@
#include "llvm/StringRef.h"
#include "SourceImpl.h"
namespace cs {
class Frame;
class SourceImpl;
class SinkImpl {
public:
SinkImpl(llvm::StringRef name);
virtual ~SinkImpl() = default;
virtual ~SinkImpl();
SinkImpl(const SinkImpl& queue) = delete;
SinkImpl& operator=(const SinkImpl& queue) = delete;
llvm::StringRef GetName() const { return m_name; }
virtual void GetDescription(llvm::SmallVectorImpl<char>& desc) const = 0;
void SetSource(std::shared_ptr<SourceImpl> source) {
void Enable() {
std::lock_guard<std::mutex> lock(m_mutex);
m_source = source;
if (!m_enabled) {
m_enabled = true;
if (m_source) m_source->EnableSink();
}
}
void Disable() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_enabled) {
m_enabled = false;
if (m_source) m_source->DisableSink();
}
}
void SetSource(std::shared_ptr<SourceImpl> source);
std::shared_ptr<SourceImpl> GetSource() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_source;
@@ -43,6 +57,7 @@ class SinkImpl {
std::string m_name;
mutable std::mutex m_mutex;
std::shared_ptr<SourceImpl> m_source;
bool m_enabled{false};
};
} // namespace cs

View File

@@ -17,6 +17,7 @@ SourceImpl::SourceImpl(llvm::StringRef name)
SourceImpl::~SourceImpl() {
// Wake up anyone who is waiting. This also clears the current frame,
// which is good because its destructor will call back into the class.
EnableSink();
Wakeup();
// Everything else can clean up itself.
}
@@ -24,7 +25,7 @@ SourceImpl::~SourceImpl() {
Frame SourceImpl::GetNextFrame() {
std::unique_lock<std::mutex> lock{m_frameMutex};
// TODO: handle spurious wakeup by comparing frame timestamps
m_cv.wait(lock);
m_frameCv.wait(lock);
return m_frame;
}
@@ -33,7 +34,7 @@ void SourceImpl::Wakeup() {
std::lock_guard<std::mutex> lock{m_frameMutex};
m_frame = Frame{*this, nullptr};
}
m_cv.notify_all();
m_frameCv.notify_all();
}
void SourceImpl::StartFrame() {
@@ -56,7 +57,7 @@ void SourceImpl::FinishFrame() {
std::lock_guard<std::mutex> lock2{m_frameMutex};
m_frame = Frame{*this, m_frameData.release()};
}
m_cv.notify_all();
m_frameCv.notify_all();
}
void SourceImpl::ReleaseFrame(Frame::Data* data) {

View File

@@ -35,6 +35,38 @@ class SourceImpl {
int GetNumChannels() const { return m_numChannels; }
bool IsConnected() const { return m_connected; }
// Functions to keep track of the overall number of sinks connected to this
// source. Primarily used by sinks to determine if other sinks are using
// the same source.
int GetNumSinks() const { return m_numSinks; }
void AddSink() { ++m_numSinks; }
void RemoveSink() { --m_numSinks; }
// Functions to keep track of the number of sinks connected to this source
// that are "enabled", in other words, listening for new images. Primarily
// used by sources to determine whether they should actually bother trying
// to get source frames.
int GetNumSinksEnabled() const {
std::lock_guard<std::mutex> lock{m_numSinksEnabledMutex};
return m_numSinksEnabled;
}
void EnableSink() {
std::lock_guard<std::mutex> lock{m_numSinksEnabledMutex};
++m_numSinksEnabled;
m_numSinksEnabledCv.notify_all();
}
void DisableSink() {
std::lock_guard<std::mutex> lock{m_numSinksEnabledMutex};
--m_numSinksEnabled;
}
void WaitForEnabledSink() {
std::unique_lock<std::mutex> lock{m_numSinksEnabledMutex};
m_numSinksEnabledCv.wait(lock, [this] { return m_numSinksEnabled > 0; });
}
// Gets the current frame (without waiting for a new one).
Frame GetCurFrame();
@@ -72,6 +104,7 @@ class SourceImpl {
std::atomic_int m_numChannels{0};
std::atomic_bool m_connected{false};
std::atomic_int m_numSinks{0};
private:
void ReleaseFrame(Frame::Data* data);
@@ -79,8 +112,13 @@ class SourceImpl {
std::string m_name;
std::mutex m_mutex;
mutable std::mutex m_numSinksEnabledMutex;
std::condition_variable m_numSinksEnabledCv;
int m_numSinksEnabled;
std::mutex m_frameMutex;
std::condition_variable m_cv;
std::condition_variable m_frameCv;
// Most recent complete frame (returned to callers of GetNextFrame)
// Access protected by m_frameMutex.