2015-07-13 22:46:41 -07:00
|
|
|
/*----------------------------------------------------------------------------*/
|
|
|
|
|
/* Copyright (c) FIRST 2015. All Rights Reserved. */
|
|
|
|
|
/* Open Source Software - may be modified and shared by FRC teams. The code */
|
|
|
|
|
/* must be accompanied by the FIRST BSD license file in the root directory of */
|
|
|
|
|
/* the project. */
|
|
|
|
|
/*----------------------------------------------------------------------------*/
|
|
|
|
|
|
|
|
|
|
#include "NetworkConnection.h"
|
|
|
|
|
|
2015-08-02 10:47:05 -07:00
|
|
|
#include "support/timestamp.h"
|
2015-08-02 00:06:27 -07:00
|
|
|
#include "tcpsockets/NetworkStream.h"
|
2015-07-31 22:41:26 -07:00
|
|
|
#include "Log.h"
|
2015-08-02 21:47:01 -07:00
|
|
|
#include "Notifier.h"
|
2015-07-13 22:46:41 -07:00
|
|
|
#include "raw_socket_istream.h"
|
|
|
|
|
#include "WireDecoder.h"
|
|
|
|
|
#include "WireEncoder.h"
|
|
|
|
|
|
2015-07-17 07:21:07 -07:00
|
|
|
using namespace nt;
|
2015-07-13 22:46:41 -07:00
|
|
|
|
2015-08-13 13:12:15 -07:00
|
|
|
std::atomic_uint NetworkConnection::s_uid;
|
|
|
|
|
|
2015-08-02 00:06:27 -07:00
|
|
|
NetworkConnection::NetworkConnection(std::unique_ptr<NetworkStream> stream,
|
2015-08-02 21:47:01 -07:00
|
|
|
Notifier& notifier,
|
2015-07-31 20:32:52 -07:00
|
|
|
HandshakeFunc handshake,
|
2015-08-13 13:12:15 -07:00
|
|
|
Message::GetEntryTypeFunc get_entry_type)
|
|
|
|
|
: m_uid(s_uid.fetch_add(1)),
|
|
|
|
|
m_stream(std::move(stream)),
|
2015-08-02 21:47:01 -07:00
|
|
|
m_notifier(notifier),
|
2015-07-31 20:32:52 -07:00
|
|
|
m_handshake(handshake),
|
2015-08-13 13:12:15 -07:00
|
|
|
m_get_entry_type(get_entry_type) {
|
2015-07-20 20:52:26 -07:00
|
|
|
m_active = false;
|
|
|
|
|
m_proto_rev = 0x0300;
|
2015-07-29 20:31:59 -07:00
|
|
|
m_state = static_cast<int>(kCreated);
|
2015-08-02 10:47:05 -07:00
|
|
|
m_last_update = 0;
|
2015-07-20 20:52:26 -07:00
|
|
|
}
|
2015-07-13 22:46:41 -07:00
|
|
|
|
|
|
|
|
NetworkConnection::~NetworkConnection() { Stop(); }
|
|
|
|
|
|
|
|
|
|
void NetworkConnection::Start() {
|
2015-07-14 23:15:30 -07:00
|
|
|
if (m_active) return;
|
2015-07-13 22:46:41 -07:00
|
|
|
m_active = true;
|
2015-07-29 20:31:59 -07:00
|
|
|
m_state = static_cast<int>(kInit);
|
2015-07-29 23:45:04 -07:00
|
|
|
// clear queue
|
2015-07-29 20:31:59 -07:00
|
|
|
while (!m_outgoing.empty()) m_outgoing.pop();
|
|
|
|
|
// start threads
|
2015-07-13 22:46:41 -07:00
|
|
|
m_write_thread = std::thread(&NetworkConnection::WriteThreadMain, this);
|
|
|
|
|
m_read_thread = std::thread(&NetworkConnection::ReadThreadMain, this);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void NetworkConnection::Stop() {
|
2015-07-29 20:31:59 -07:00
|
|
|
m_state = static_cast<int>(kDead);
|
2015-07-13 22:46:41 -07:00
|
|
|
m_active = false;
|
2015-07-17 22:28:47 -07:00
|
|
|
// closing the stream so the read thread terminates
|
|
|
|
|
if (m_stream) m_stream->close();
|
2015-07-29 20:31:59 -07:00
|
|
|
// send an empty outgoing message set so the write thread terminates
|
|
|
|
|
m_outgoing.push(Outgoing());
|
|
|
|
|
// wait for threads to terminate
|
2015-07-13 22:46:41 -07:00
|
|
|
if (m_write_thread.joinable()) m_write_thread.join();
|
|
|
|
|
if (m_read_thread.joinable()) m_read_thread.join();
|
2015-07-29 23:45:04 -07:00
|
|
|
// clear queue
|
2015-07-29 20:31:59 -07:00
|
|
|
while (!m_outgoing.empty()) m_outgoing.pop();
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 21:47:01 -07:00
|
|
|
ConnectionInfo NetworkConnection::info() const {
|
|
|
|
|
return ConnectionInfo{remote_id(), m_stream->getPeerIP(),
|
|
|
|
|
static_cast<unsigned int>(m_stream->getPeerPort()),
|
|
|
|
|
m_last_update, m_proto_rev};
|
|
|
|
|
}
|
|
|
|
|
|
2015-07-29 20:31:59 -07:00
|
|
|
std::string NetworkConnection::remote_id() const {
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_remote_id_mutex);
|
|
|
|
|
return m_remote_id;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void NetworkConnection::set_remote_id(StringRef remote_id) {
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_remote_id_mutex);
|
|
|
|
|
m_remote_id = remote_id;
|
2015-07-13 22:46:41 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void NetworkConnection::ReadThreadMain() {
|
|
|
|
|
raw_socket_istream is(*m_stream);
|
|
|
|
|
WireDecoder decoder(is, m_proto_rev);
|
|
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
m_state = static_cast<int>(kHandshake);
|
|
|
|
|
if (!m_handshake(*this,
|
|
|
|
|
[&] {
|
|
|
|
|
decoder.set_proto_rev(m_proto_rev);
|
|
|
|
|
auto msg = Message::Read(decoder, m_get_entry_type);
|
2015-07-31 22:41:26 -07:00
|
|
|
if (!msg && decoder.error())
|
|
|
|
|
DEBUG("error reading in handshake: " << decoder.error());
|
2015-07-31 20:32:52 -07:00
|
|
|
return msg;
|
|
|
|
|
},
|
|
|
|
|
[&](llvm::ArrayRef<std::shared_ptr<Message>> msgs) {
|
|
|
|
|
m_outgoing.emplace(msgs);
|
|
|
|
|
})) {
|
|
|
|
|
m_state = static_cast<int>(kDead);
|
|
|
|
|
m_active = false;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
m_state = static_cast<int>(kActive);
|
2015-08-27 23:48:40 -07:00
|
|
|
m_notifier.NotifyConnection(true, info());
|
2015-07-13 22:46:41 -07:00
|
|
|
while (m_active) {
|
|
|
|
|
if (!m_stream)
|
|
|
|
|
break;
|
|
|
|
|
decoder.set_proto_rev(m_proto_rev);
|
|
|
|
|
decoder.Reset();
|
2015-07-15 21:20:18 -07:00
|
|
|
auto msg = Message::Read(decoder, m_get_entry_type);
|
|
|
|
|
if (!msg) {
|
|
|
|
|
// terminate connection on bad message
|
2015-07-17 22:28:47 -07:00
|
|
|
if (m_stream) m_stream->close();
|
2015-07-15 21:20:18 -07:00
|
|
|
break;
|
|
|
|
|
}
|
2015-08-02 22:20:07 -07:00
|
|
|
DEBUG4("received type=" << msg->type() << " with str=" << msg->str()
|
|
|
|
|
<< " id=" << msg->id()
|
|
|
|
|
<< " seq_num=" << msg->seq_num_uid());
|
2015-08-02 10:47:05 -07:00
|
|
|
m_last_update = Now();
|
2015-07-31 23:56:06 -07:00
|
|
|
m_process_incoming(std::move(msg), this);
|
2015-07-13 22:46:41 -07:00
|
|
|
}
|
2015-07-31 23:14:26 -07:00
|
|
|
DEBUG3("read thread died");
|
2015-08-27 23:48:40 -07:00
|
|
|
if (m_state != kDead) m_notifier.NotifyConnection(false, info());
|
2015-07-29 20:31:59 -07:00
|
|
|
m_state = static_cast<int>(kDead);
|
2015-07-13 22:46:41 -07:00
|
|
|
m_active = false;
|
2015-07-31 23:40:16 -07:00
|
|
|
m_outgoing.push(Outgoing()); // also kill write thread
|
2015-07-13 22:46:41 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void NetworkConnection::WriteThreadMain() {
|
|
|
|
|
WireEncoder encoder(m_proto_rev);
|
|
|
|
|
|
|
|
|
|
while (m_active) {
|
|
|
|
|
auto msgs = m_outgoing.pop();
|
2015-07-31 22:41:26 -07:00
|
|
|
DEBUG4("write thread woke up");
|
2015-07-31 23:40:16 -07:00
|
|
|
if (msgs.empty()) continue;
|
2015-07-13 22:46:41 -07:00
|
|
|
encoder.set_proto_rev(m_proto_rev);
|
|
|
|
|
encoder.Reset();
|
2015-08-02 22:20:07 -07:00
|
|
|
DEBUG3("sending " << msgs.size() << " messages");
|
2015-07-29 23:45:04 -07:00
|
|
|
for (auto& msg : msgs) {
|
2015-08-02 22:20:07 -07:00
|
|
|
if (msg) {
|
|
|
|
|
DEBUG4("sending type=" << msg->type() << " with str=" << msg->str()
|
|
|
|
|
<< " id=" << msg->id()
|
|
|
|
|
<< " seq_num=" << msg->seq_num_uid());
|
|
|
|
|
msg->Write(encoder);
|
|
|
|
|
}
|
2015-07-29 23:45:04 -07:00
|
|
|
}
|
2015-08-02 00:06:27 -07:00
|
|
|
NetworkStream::Error err;
|
2015-07-13 22:46:41 -07:00
|
|
|
if (!m_stream) break;
|
2015-07-31 20:32:52 -07:00
|
|
|
if (encoder.size() == 0) continue;
|
2015-07-13 22:46:41 -07:00
|
|
|
if (m_stream->send(encoder.data(), encoder.size(), &err) == 0) break;
|
2015-07-31 22:41:26 -07:00
|
|
|
DEBUG4("sent " << encoder.size() << " bytes");
|
2015-07-13 22:46:41 -07:00
|
|
|
}
|
2015-07-31 23:14:26 -07:00
|
|
|
DEBUG3("write thread died");
|
2015-08-27 23:48:40 -07:00
|
|
|
if (m_state != kDead) m_notifier.NotifyConnection(false, info());
|
2015-07-29 20:31:59 -07:00
|
|
|
m_state = static_cast<int>(kDead);
|
2015-07-13 22:46:41 -07:00
|
|
|
m_active = false;
|
2015-07-31 23:40:16 -07:00
|
|
|
if (m_stream) m_stream->close(); // also kill read thread
|
2015-07-13 22:46:41 -07:00
|
|
|
}
|
2015-08-13 13:12:15 -07:00
|
|
|
|
|
|
|
|
void NetworkConnection::QueueOutgoing(std::shared_ptr<Message> msg) {
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_pending_mutex);
|
|
|
|
|
|
|
|
|
|
// Merge with previous. One case we don't combine: delete/assign loop.
|
|
|
|
|
switch (msg->type()) {
|
|
|
|
|
case Message::kEntryAssign:
|
|
|
|
|
case Message::kEntryUpdate: {
|
|
|
|
|
// don't do this for unassigned id's
|
|
|
|
|
unsigned int id = msg->id();
|
|
|
|
|
if (id == 0xffff) {
|
|
|
|
|
m_pending_outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (id < m_pending_update.size() && m_pending_update[id].first != 0) {
|
|
|
|
|
// overwrite the previous one for this id
|
|
|
|
|
auto& oldmsg = m_pending_outgoing[m_pending_update[id].first - 1];
|
|
|
|
|
if (oldmsg && oldmsg->Is(Message::kEntryAssign) &&
|
|
|
|
|
msg->Is(Message::kEntryUpdate)) {
|
|
|
|
|
// need to update assignment with new seq_num and value
|
|
|
|
|
oldmsg = Message::EntryAssign(oldmsg->str(), id, msg->seq_num_uid(),
|
|
|
|
|
msg->value(), oldmsg->flags());
|
|
|
|
|
} else
|
|
|
|
|
oldmsg = msg; // easy update
|
|
|
|
|
} else {
|
|
|
|
|
// new, but remember it
|
|
|
|
|
std::size_t pos = m_pending_outgoing.size();
|
|
|
|
|
m_pending_outgoing.push_back(msg);
|
|
|
|
|
if (id >= m_pending_update.size()) m_pending_update.resize(id + 1);
|
|
|
|
|
m_pending_update[id].first = pos + 1;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case Message::kEntryDelete: {
|
|
|
|
|
// don't do this for unassigned id's
|
|
|
|
|
unsigned int id = msg->id();
|
|
|
|
|
if (id == 0xffff) {
|
|
|
|
|
m_pending_outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// clear previous updates
|
|
|
|
|
if (id < m_pending_update.size()) {
|
|
|
|
|
if (m_pending_update[id].first != 0) {
|
|
|
|
|
m_pending_outgoing[m_pending_update[id].first - 1].reset();
|
|
|
|
|
m_pending_update[id].first = 0;
|
|
|
|
|
}
|
|
|
|
|
if (m_pending_update[id].second != 0) {
|
|
|
|
|
m_pending_outgoing[m_pending_update[id].second - 1].reset();
|
|
|
|
|
m_pending_update[id].second = 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add deletion
|
|
|
|
|
m_pending_outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case Message::kFlagsUpdate: {
|
|
|
|
|
// don't do this for unassigned id's
|
|
|
|
|
unsigned int id = msg->id();
|
|
|
|
|
if (id == 0xffff) {
|
|
|
|
|
m_pending_outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (id < m_pending_update.size() && m_pending_update[id].second != 0) {
|
|
|
|
|
// overwrite the previous one for this id
|
|
|
|
|
m_pending_outgoing[m_pending_update[id].second - 1] = msg;
|
|
|
|
|
} else {
|
|
|
|
|
// new, but remember it
|
|
|
|
|
std::size_t pos = m_pending_outgoing.size();
|
|
|
|
|
m_pending_outgoing.push_back(msg);
|
|
|
|
|
if (id >= m_pending_update.size()) m_pending_update.resize(id + 1);
|
|
|
|
|
m_pending_update[id].second = pos + 1;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case Message::kClearEntries: {
|
|
|
|
|
// knock out all previous assigns/updates!
|
|
|
|
|
for (auto& i : m_pending_outgoing) {
|
|
|
|
|
if (!i) continue;
|
|
|
|
|
auto t = i->type();
|
|
|
|
|
if (t == Message::kEntryAssign || t == Message::kEntryUpdate ||
|
|
|
|
|
t == Message::kFlagsUpdate || t == Message::kEntryDelete ||
|
|
|
|
|
t == Message::kClearEntries)
|
|
|
|
|
i.reset();
|
|
|
|
|
}
|
|
|
|
|
m_pending_update.resize(0);
|
|
|
|
|
m_pending_outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
m_pending_outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void NetworkConnection::PostOutgoing() {
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_pending_mutex);
|
|
|
|
|
if (m_pending_outgoing.empty()) return;
|
|
|
|
|
m_outgoing.emplace(std::move(m_pending_outgoing));
|
|
|
|
|
m_pending_outgoing.resize(0);
|
|
|
|
|
m_pending_update.resize(0);
|
|
|
|
|
}
|