mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-23 01:21:42 +00:00
Various NetworkConnection improvements.
- Keep connection state. - Store remote id - Getter for proto_rev - Cleanup on Stop() - Cleaner termination of write thread.
This commit is contained in:
@@ -20,6 +20,7 @@ NetworkConnection::NetworkConnection(std::unique_ptr<TCPStream> stream,
|
||||
m_get_entry_type(get_entry_type) {
|
||||
m_active = false;
|
||||
m_proto_rev = 0x0300;
|
||||
m_state = static_cast<int>(kCreated);
|
||||
}
|
||||
|
||||
NetworkConnection::~NetworkConnection() { Stop(); }
|
||||
@@ -27,18 +28,38 @@ NetworkConnection::~NetworkConnection() { Stop(); }
|
||||
void NetworkConnection::Start() {
|
||||
if (m_active) return;
|
||||
m_active = true;
|
||||
m_state = static_cast<int>(kInit);
|
||||
// clear queues
|
||||
while (!m_incoming.empty()) m_incoming.pop();
|
||||
while (!m_outgoing.empty()) m_outgoing.pop();
|
||||
// start threads
|
||||
m_write_thread = std::thread(&NetworkConnection::WriteThreadMain, this);
|
||||
m_read_thread = std::thread(&NetworkConnection::ReadThreadMain, this);
|
||||
}
|
||||
|
||||
void NetworkConnection::Stop() {
|
||||
m_state = static_cast<int>(kDead);
|
||||
m_active = false;
|
||||
// closing the stream so the read thread terminates
|
||||
if (m_stream) m_stream->close();
|
||||
// send a dummy outgoing message so the write thread terminates
|
||||
m_outgoing.push(Outgoing{Message::KeepAlive()});
|
||||
// send an empty outgoing message set so the write thread terminates
|
||||
m_outgoing.push(Outgoing());
|
||||
// wait for threads to terminate
|
||||
if (m_write_thread.joinable()) m_write_thread.join();
|
||||
if (m_read_thread.joinable()) m_read_thread.join();
|
||||
// clear queues
|
||||
while (!m_incoming.empty()) m_incoming.pop();
|
||||
while (!m_outgoing.empty()) m_outgoing.pop();
|
||||
}
|
||||
|
||||
std::string NetworkConnection::remote_id() const {
|
||||
std::lock_guard<std::mutex> lock(m_remote_id_mutex);
|
||||
return m_remote_id;
|
||||
}
|
||||
|
||||
void NetworkConnection::set_remote_id(StringRef remote_id) {
|
||||
std::lock_guard<std::mutex> lock(m_remote_id_mutex);
|
||||
m_remote_id = remote_id;
|
||||
}
|
||||
|
||||
void NetworkConnection::ReadThreadMain() {
|
||||
@@ -59,6 +80,7 @@ void NetworkConnection::ReadThreadMain() {
|
||||
m_incoming.emplace(std::move(msg));
|
||||
}
|
||||
m_incoming.emplace(nullptr); // notify anyone waiting that we disconnected
|
||||
m_state = static_cast<int>(kDead);
|
||||
m_active = false;
|
||||
}
|
||||
|
||||
@@ -67,7 +89,7 @@ void NetworkConnection::WriteThreadMain() {
|
||||
|
||||
while (m_active) {
|
||||
auto msgs = m_outgoing.pop();
|
||||
if (!m_active) break;
|
||||
if (msgs.empty()) break;
|
||||
encoder.set_proto_rev(m_proto_rev);
|
||||
encoder.Reset();
|
||||
for (auto& msg : msgs) msg->Write(encoder);
|
||||
@@ -75,5 +97,6 @@ void NetworkConnection::WriteThreadMain() {
|
||||
if (!m_stream) break;
|
||||
if (m_stream->send(encoder.data(), encoder.size(), &err) == 0) break;
|
||||
}
|
||||
m_state = static_cast<int>(kDead);
|
||||
m_active = false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user