2015-07-16 22:55:50 -07:00
|
|
|
/*----------------------------------------------------------------------------*/
|
|
|
|
|
/* Copyright (c) FIRST 2015. All Rights Reserved. */
|
|
|
|
|
/* Open Source Software - may be modified and shared by FRC teams. The code */
|
|
|
|
|
/* must be accompanied by the FIRST BSD license file in the root directory of */
|
|
|
|
|
/* the project. */
|
|
|
|
|
/*----------------------------------------------------------------------------*/
|
|
|
|
|
|
|
|
|
|
#include "Dispatcher.h"
|
|
|
|
|
|
2015-07-29 20:33:26 -07:00
|
|
|
#include <algorithm>
|
|
|
|
|
#include <iterator>
|
|
|
|
|
|
2015-07-16 22:55:50 -07:00
|
|
|
#include "tcpsockets/TCPAcceptor.h"
|
|
|
|
|
#include "tcpsockets/TCPConnector.h"
|
2015-07-31 22:41:26 -07:00
|
|
|
#include "Log.h"
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-07-17 07:21:07 -07:00
|
|
|
using namespace nt;
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-07-20 23:36:22 -07:00
|
|
|
ATOMIC_STATIC_INIT(Dispatcher)
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void Dispatcher::StartServer(const char* listen_address, unsigned int port) {
|
|
|
|
|
DispatcherBase::StartServer(std::unique_ptr<NetworkAcceptor>(
|
|
|
|
|
new TCPAcceptor(static_cast<int>(port), listen_address)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Dispatcher::StartClient(const char* server_name, unsigned int port) {
|
|
|
|
|
DispatcherBase::StartClient(std::bind(&TCPConnector::connect, server_name,
|
|
|
|
|
static_cast<int>(port), 1));
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 21:47:01 -07:00
|
|
|
DispatcherBase::DispatcherBase(Storage& storage, Notifier& notifier)
|
2015-08-01 11:15:04 -07:00
|
|
|
: m_storage(storage),
|
2015-08-02 21:47:01 -07:00
|
|
|
m_notifier(notifier),
|
2015-08-01 11:15:04 -07:00
|
|
|
m_server(false),
|
2015-07-17 22:39:36 -07:00
|
|
|
m_do_flush(false),
|
2015-07-31 20:32:52 -07:00
|
|
|
m_reconnect_proto_rev(0x0300),
|
2015-07-31 23:14:26 -07:00
|
|
|
m_do_reconnect(true) {
|
2015-07-20 20:21:37 -07:00
|
|
|
m_active = false;
|
|
|
|
|
m_update_rate = 100;
|
|
|
|
|
}
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
DispatcherBase::~DispatcherBase() {
|
2015-07-29 20:33:26 -07:00
|
|
|
Stop();
|
|
|
|
|
}
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::StartServer(std::unique_ptr<NetworkAcceptor> acceptor) {
|
2015-07-17 22:39:36 -07:00
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
|
|
|
|
if (m_active) return;
|
|
|
|
|
m_active = true;
|
|
|
|
|
}
|
|
|
|
|
m_server = true;
|
2015-08-02 00:33:41 -07:00
|
|
|
m_server_acceptor = std::move(acceptor);
|
2015-07-29 23:45:04 -07:00
|
|
|
|
|
|
|
|
using namespace std::placeholders;
|
2015-08-01 11:15:04 -07:00
|
|
|
m_storage.SetOutgoing(std::bind(&Dispatcher::QueueOutgoing, this, _1, _2, _3),
|
|
|
|
|
m_server);
|
2015-07-29 23:45:04 -07:00
|
|
|
|
2015-07-16 22:55:50 -07:00
|
|
|
m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this);
|
2015-08-02 00:33:41 -07:00
|
|
|
m_clientserver_thread = std::thread(&Dispatcher::ServerThreadMain, this);
|
2015-07-16 22:55:50 -07:00
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::StartClient(
|
|
|
|
|
std::function<std::unique_ptr<NetworkStream>()> connect) {
|
2015-07-17 22:39:36 -07:00
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
|
|
|
|
if (m_active) return;
|
|
|
|
|
m_active = true;
|
|
|
|
|
}
|
|
|
|
|
m_server = false;
|
2015-07-29 23:45:04 -07:00
|
|
|
|
|
|
|
|
using namespace std::placeholders;
|
2015-08-01 11:15:04 -07:00
|
|
|
m_storage.SetOutgoing(std::bind(&Dispatcher::QueueOutgoing, this, _1, _2, _3),
|
|
|
|
|
m_server);
|
2015-07-29 23:45:04 -07:00
|
|
|
|
2015-07-16 22:55:50 -07:00
|
|
|
m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this);
|
|
|
|
|
m_clientserver_thread =
|
2015-08-02 00:33:41 -07:00
|
|
|
std::thread(&Dispatcher::ClientThreadMain, this, connect);
|
2015-07-16 22:55:50 -07:00
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::Stop() {
|
2015-08-01 23:36:28 -07:00
|
|
|
m_active = false;
|
2015-07-17 22:39:36 -07:00
|
|
|
|
|
|
|
|
// wake up dispatch thread with a flush
|
|
|
|
|
m_flush_cv.notify_one();
|
|
|
|
|
|
|
|
|
|
// wake up client thread with a reconnect
|
|
|
|
|
ClientReconnect();
|
|
|
|
|
|
|
|
|
|
// wake up server thread by shutting down the socket
|
|
|
|
|
if (m_server_acceptor) m_server_acceptor->shutdown();
|
|
|
|
|
|
|
|
|
|
// join threads
|
2015-07-16 22:55:50 -07:00
|
|
|
if (m_dispatch_thread.joinable()) m_dispatch_thread.join();
|
|
|
|
|
if (m_clientserver_thread.joinable()) m_clientserver_thread.join();
|
2015-08-01 23:36:28 -07:00
|
|
|
|
|
|
|
|
std::vector<Connection> conns;
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
|
|
|
|
conns.swap(m_connections);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// close all connections
|
|
|
|
|
conns.resize(0);
|
2015-07-16 22:55:50 -07:00
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::SetUpdateRate(double interval) {
|
2015-07-17 22:39:36 -07:00
|
|
|
// don't allow update rates faster than 100 ms
|
|
|
|
|
if (interval < 0.1)
|
|
|
|
|
interval = 0.1;
|
2015-07-20 20:19:01 -07:00
|
|
|
m_update_rate = static_cast<unsigned int>(interval * 1000);
|
2015-07-16 22:55:50 -07:00
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::SetIdentity(llvm::StringRef name) {
|
2015-07-16 22:55:50 -07:00
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
|
|
|
|
m_identity = name;
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::Flush() {
|
2015-07-17 22:39:36 -07:00
|
|
|
auto now = std::chrono::steady_clock::now();
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_flush_mutex);
|
|
|
|
|
// don't allow flushes more often than every 100 ms
|
|
|
|
|
if ((now - m_last_flush) < std::chrono::milliseconds(100))
|
|
|
|
|
return;
|
|
|
|
|
m_last_flush = now;
|
|
|
|
|
m_do_flush = true;
|
|
|
|
|
}
|
|
|
|
|
m_flush_cv.notify_one();
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 10:47:05 -07:00
|
|
|
std::vector<ConnectionInfo> DispatcherBase::GetConnections() const {
|
|
|
|
|
std::vector<ConnectionInfo> conns;
|
|
|
|
|
if (!m_active) return conns;
|
|
|
|
|
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
|
|
|
|
for (auto& conn : m_connections) {
|
|
|
|
|
if (conn.net->state() != NetworkConnection::kActive) continue;
|
2015-08-02 21:47:01 -07:00
|
|
|
conns.emplace_back(conn.net->info());
|
2015-08-02 10:47:05 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return conns;
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::DispatchThreadMain() {
|
2015-07-29 20:33:26 -07:00
|
|
|
// local copy of active m_connections
|
2015-07-29 23:45:04 -07:00
|
|
|
struct ConnectionRef {
|
|
|
|
|
NetworkConnection* net;
|
|
|
|
|
NetworkConnection::Outgoing outgoing;
|
|
|
|
|
};
|
|
|
|
|
std::vector<ConnectionRef> connections;
|
2015-07-29 20:33:26 -07:00
|
|
|
|
2015-07-17 22:39:36 -07:00
|
|
|
auto timeout_time = std::chrono::steady_clock::now();
|
|
|
|
|
int count = 0;
|
2015-07-31 23:14:26 -07:00
|
|
|
std::unique_lock<std::mutex> flush_lock(m_flush_mutex);
|
2015-07-17 22:39:36 -07:00
|
|
|
while (m_active) {
|
|
|
|
|
// handle loop taking too long
|
|
|
|
|
auto start = std::chrono::steady_clock::now();
|
|
|
|
|
if (start > timeout_time)
|
|
|
|
|
timeout_time = start;
|
|
|
|
|
|
|
|
|
|
// wait for periodic or when flushed
|
|
|
|
|
timeout_time += std::chrono::milliseconds(m_update_rate);
|
2015-07-29 20:33:26 -07:00
|
|
|
m_reconnect_cv.wait_until(flush_lock, timeout_time,
|
2015-07-17 22:39:36 -07:00
|
|
|
[&] { return !m_active || m_do_flush; });
|
|
|
|
|
m_do_flush = false;
|
|
|
|
|
if (!m_active) break; // in case we were woken up to terminate
|
|
|
|
|
|
|
|
|
|
if (++count > 10) {
|
|
|
|
|
DEBUG("dispatch running");
|
|
|
|
|
count = 0;
|
|
|
|
|
}
|
2015-07-29 20:33:26 -07:00
|
|
|
|
|
|
|
|
// make a local copy of the connections list (so we don't hold the lock)
|
|
|
|
|
connections.resize(0);
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> user_lock(m_user_mutex);
|
2015-07-31 23:14:26 -07:00
|
|
|
bool reconnect = false;
|
2015-07-29 20:33:26 -07:00
|
|
|
for (auto& conn : m_connections) {
|
2015-07-29 23:45:04 -07:00
|
|
|
if (conn.net->state() == NetworkConnection::kActive) {
|
|
|
|
|
connections.push_back(ConnectionRef());
|
|
|
|
|
connections.back().net = conn.net.get();
|
|
|
|
|
connections.back().outgoing.swap(conn.outgoing);
|
2015-08-02 23:21:23 -07:00
|
|
|
conn.last_update.resize(0); // clear "previous" updates
|
2015-07-29 23:45:04 -07:00
|
|
|
}
|
2015-07-31 23:14:26 -07:00
|
|
|
if (!m_server && conn.net->state() == NetworkConnection::kDead)
|
|
|
|
|
reconnect = true;
|
|
|
|
|
}
|
|
|
|
|
// reconnect if we disconnected (and a reconnect is not in progress)
|
|
|
|
|
if (reconnect && !m_do_reconnect) {
|
|
|
|
|
m_do_reconnect = true;
|
|
|
|
|
m_reconnect_cv.notify_one();
|
2015-07-29 20:33:26 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// send outgoing messages
|
2015-07-31 22:49:43 -07:00
|
|
|
for (auto& conn : connections) {
|
|
|
|
|
if (!conn.outgoing.empty())
|
|
|
|
|
conn.net->outgoing().emplace(std::move(conn.outgoing));
|
|
|
|
|
}
|
2015-07-29 23:45:04 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 23:21:23 -07:00
|
|
|
void DispatcherBase::Connection::QueueOutgoing(std::shared_ptr<Message> msg) {
|
|
|
|
|
// Merge with previous. One case we don't combine: delete/assign loop.
|
|
|
|
|
switch (msg->type()) {
|
|
|
|
|
case Message::kEntryAssign:
|
|
|
|
|
case Message::kEntryUpdate: {
|
|
|
|
|
// don't do this for unassigned id's
|
|
|
|
|
unsigned int id = msg->id();
|
|
|
|
|
if (id == 0xffff) {
|
|
|
|
|
outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (id < last_update.size() && last_update[id].first != 0) {
|
|
|
|
|
// overwrite the previous one for this id
|
|
|
|
|
auto& oldmsg = outgoing[last_update[id].first - 1];
|
|
|
|
|
if (oldmsg && oldmsg->Is(Message::kEntryAssign) &&
|
|
|
|
|
msg->Is(Message::kEntryUpdate)) {
|
|
|
|
|
// need to update assignment with new seq_num and value
|
|
|
|
|
oldmsg = Message::EntryAssign(oldmsg->str(), id, msg->seq_num_uid(),
|
|
|
|
|
msg->value(), oldmsg->flags());
|
|
|
|
|
} else
|
|
|
|
|
oldmsg = msg; // easy update
|
|
|
|
|
} else {
|
|
|
|
|
// new, but remember it
|
|
|
|
|
std::size_t pos = outgoing.size();
|
|
|
|
|
outgoing.push_back(msg);
|
|
|
|
|
if (id >= last_update.size()) last_update.resize(id + 1);
|
|
|
|
|
last_update[id].first = pos + 1;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case Message::kEntryDelete: {
|
|
|
|
|
// don't do this for unassigned id's
|
|
|
|
|
unsigned int id = msg->id();
|
|
|
|
|
if (id == 0xffff) {
|
|
|
|
|
outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// clear previous updates
|
|
|
|
|
if (id < last_update.size()) {
|
|
|
|
|
if (last_update[id].first != 0) {
|
|
|
|
|
outgoing[last_update[id].first - 1].reset();
|
|
|
|
|
last_update[id].first = 0;
|
|
|
|
|
}
|
|
|
|
|
if (last_update[id].second != 0) {
|
|
|
|
|
outgoing[last_update[id].second - 1].reset();
|
|
|
|
|
last_update[id].second = 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add deletion
|
|
|
|
|
outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case Message::kFlagsUpdate: {
|
|
|
|
|
// don't do this for unassigned id's
|
|
|
|
|
unsigned int id = msg->id();
|
|
|
|
|
if (id == 0xffff) {
|
|
|
|
|
outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (id < last_update.size() && last_update[id].second != 0) {
|
|
|
|
|
// overwrite the previous one for this id
|
|
|
|
|
outgoing[last_update[id].second - 1] = msg;
|
|
|
|
|
} else {
|
|
|
|
|
// new, but remember it
|
|
|
|
|
std::size_t pos = outgoing.size();
|
|
|
|
|
outgoing.push_back(msg);
|
|
|
|
|
if (id >= last_update.size()) last_update.resize(id + 1);
|
|
|
|
|
last_update[id].second = pos + 1;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case Message::kClearEntries: {
|
|
|
|
|
// knock out all previous assigns/updates!
|
|
|
|
|
for (auto& i : outgoing) {
|
|
|
|
|
if (!i) continue;
|
|
|
|
|
auto t = i->type();
|
|
|
|
|
if (t == Message::kEntryAssign || t == Message::kEntryUpdate ||
|
|
|
|
|
t == Message::kFlagsUpdate || t == Message::kEntryDelete ||
|
|
|
|
|
t == Message::kClearEntries)
|
|
|
|
|
i.reset();
|
|
|
|
|
}
|
|
|
|
|
last_update.resize(0);
|
|
|
|
|
outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
outgoing.push_back(msg);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
|
|
|
|
|
NetworkConnection* only,
|
|
|
|
|
NetworkConnection* except) {
|
2015-07-29 23:45:04 -07:00
|
|
|
std::lock_guard<std::mutex> user_lock(m_user_mutex);
|
|
|
|
|
for (auto& conn : m_connections) {
|
|
|
|
|
if (conn.net.get() == except) continue;
|
|
|
|
|
if (only && conn.net.get() != only) continue;
|
2015-07-31 23:56:06 -07:00
|
|
|
auto state = conn.net->state();
|
|
|
|
|
if (state != NetworkConnection::kSynchronized &&
|
|
|
|
|
state != NetworkConnection::kActive) continue;
|
2015-08-02 23:21:23 -07:00
|
|
|
conn.QueueOutgoing(msg);
|
2015-07-17 22:39:36 -07:00
|
|
|
}
|
2015-07-16 22:55:50 -07:00
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::ServerThreadMain() {
|
2015-07-17 22:39:36 -07:00
|
|
|
if (m_server_acceptor->start() != 0) {
|
2015-07-16 22:55:50 -07:00
|
|
|
m_active = false;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
while (m_active) {
|
2015-07-17 22:39:36 -07:00
|
|
|
auto stream = m_server_acceptor->accept();
|
2015-07-16 22:55:50 -07:00
|
|
|
if (!stream) {
|
|
|
|
|
m_active = false;
|
|
|
|
|
break;
|
|
|
|
|
}
|
2015-07-31 23:26:38 -07:00
|
|
|
DEBUG("server: client connection from " << stream->getPeerIP() << " port "
|
|
|
|
|
<< stream->getPeerPort());
|
2015-07-17 22:39:36 -07:00
|
|
|
|
2015-07-16 22:55:50 -07:00
|
|
|
// add to connections list
|
2015-07-29 23:45:04 -07:00
|
|
|
using namespace std::placeholders;
|
|
|
|
|
std::unique_ptr<NetworkConnection> conn_unique(new NetworkConnection(
|
2015-07-19 23:17:14 -07:00
|
|
|
std::move(stream),
|
2015-08-02 21:47:01 -07:00
|
|
|
m_notifier,
|
2015-07-31 20:32:52 -07:00
|
|
|
std::bind(&Dispatcher::ServerHandshake, this, _1, _2, _3),
|
2015-08-01 11:15:04 -07:00
|
|
|
std::bind(&Storage::GetEntryType, &m_storage, _1),
|
|
|
|
|
std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2)));
|
2015-07-29 23:45:04 -07:00
|
|
|
auto conn = conn_unique.get();
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
|
|
|
|
m_connections.emplace_back(std::move(conn_unique));
|
|
|
|
|
}
|
2015-07-29 20:33:26 -07:00
|
|
|
conn->Start();
|
2015-07-16 22:55:50 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::ClientThreadMain(
|
|
|
|
|
std::function<std::unique_ptr<NetworkStream>()> connect) {
|
2015-07-16 22:55:50 -07:00
|
|
|
while (m_active) {
|
2015-07-17 22:39:36 -07:00
|
|
|
// sleep between retries
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
|
|
|
|
|
2015-07-16 22:55:50 -07:00
|
|
|
// try to connect (with timeout)
|
2015-07-17 22:39:36 -07:00
|
|
|
DEBUG("client trying to connect");
|
2015-08-02 00:33:41 -07:00
|
|
|
auto stream = connect();
|
2015-07-16 22:55:50 -07:00
|
|
|
if (!stream) continue; // keep retrying
|
2015-07-17 22:39:36 -07:00
|
|
|
DEBUG("client connected");
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-07-31 23:14:26 -07:00
|
|
|
std::unique_lock<std::mutex> lock(m_user_mutex);
|
2015-07-31 20:32:52 -07:00
|
|
|
using namespace std::placeholders;
|
|
|
|
|
std::unique_ptr<NetworkConnection> conn_unique(new NetworkConnection(
|
2015-07-19 23:17:14 -07:00
|
|
|
std::move(stream),
|
2015-08-02 21:47:01 -07:00
|
|
|
m_notifier,
|
2015-07-31 20:32:52 -07:00
|
|
|
std::bind(&Dispatcher::ClientHandshake, this, _1, _2, _3),
|
2015-08-01 11:15:04 -07:00
|
|
|
std::bind(&Storage::GetEntryType, &m_storage, _1),
|
|
|
|
|
std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2)));
|
2015-07-31 20:32:52 -07:00
|
|
|
auto conn = conn_unique.get();
|
2015-07-31 23:14:26 -07:00
|
|
|
m_connections.resize(0); // disconnect any current
|
|
|
|
|
m_connections.emplace_back(std::move(conn_unique));
|
|
|
|
|
conn->set_proto_rev(m_reconnect_proto_rev);
|
2015-07-29 20:33:26 -07:00
|
|
|
conn->Start();
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
// block until told to reconnect
|
|
|
|
|
m_do_reconnect = false;
|
2015-08-01 23:36:28 -07:00
|
|
|
m_reconnect_cv.wait(lock, [&] { return !m_active || m_do_reconnect; });
|
2015-07-31 20:32:52 -07:00
|
|
|
}
|
|
|
|
|
}
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
bool DispatcherBase::ClientHandshake(
|
2015-07-31 20:32:52 -07:00
|
|
|
NetworkConnection& conn,
|
|
|
|
|
std::function<std::shared_ptr<Message>()> get_msg,
|
|
|
|
|
std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs) {
|
|
|
|
|
// get identity
|
|
|
|
|
std::string self_id;
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
|
|
|
|
self_id = m_identity;
|
|
|
|
|
}
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
// send client hello
|
|
|
|
|
DEBUG("client: sending hello");
|
|
|
|
|
send_msgs(Message::ClientHello(self_id));
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
// wait for response
|
|
|
|
|
auto msg = get_msg();
|
|
|
|
|
if (!msg) {
|
|
|
|
|
// disconnected, retry
|
|
|
|
|
DEBUG("client: server disconnected before first response");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
2015-07-17 22:39:36 -07:00
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
if (msg->Is(Message::kProtoUnsup)) {
|
|
|
|
|
if (msg->id() == 0x0200) ClientReconnect(0x0200);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool new_server = true;
|
|
|
|
|
if (conn.proto_rev() >= 0x0300) {
|
|
|
|
|
// should be server hello; if not, disconnect.
|
|
|
|
|
if (!msg->Is(Message::kServerHello)) return false;
|
|
|
|
|
conn.set_remote_id(msg->str());
|
|
|
|
|
if ((msg->flags() & 1) != 0) new_server = false;
|
|
|
|
|
// get the next message
|
|
|
|
|
msg = get_msg();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// receive initial assignments
|
|
|
|
|
std::vector<std::shared_ptr<Message>> incoming;
|
|
|
|
|
for (;;) {
|
|
|
|
|
if (!msg) {
|
|
|
|
|
// disconnected, retry
|
|
|
|
|
DEBUG("client: server disconnected during initial entries");
|
|
|
|
|
return false;
|
2015-07-16 22:55:50 -07:00
|
|
|
}
|
2015-07-31 20:32:52 -07:00
|
|
|
if (msg->Is(Message::kServerHelloDone)) break;
|
|
|
|
|
if (!msg->Is(Message::kEntryAssign)) {
|
|
|
|
|
// unexpected message
|
2015-07-31 22:41:26 -07:00
|
|
|
DEBUG("client: received message (" << msg->type() << ") other than entry assignment during initial handshake");
|
2015-07-31 20:32:52 -07:00
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
incoming.emplace_back(std::move(msg));
|
|
|
|
|
// get the next message
|
|
|
|
|
msg = get_msg();
|
|
|
|
|
}
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
// generate outgoing assignments
|
|
|
|
|
NetworkConnection::Outgoing outgoing;
|
2015-07-18 01:29:24 -07:00
|
|
|
|
2015-08-01 11:15:04 -07:00
|
|
|
m_storage.ApplyInitialAssignments(conn, incoming, new_server, &outgoing);
|
2015-07-18 01:29:24 -07:00
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
if (conn.proto_rev() >= 0x0300)
|
|
|
|
|
outgoing.emplace_back(Message::ClientHelloDone());
|
2015-07-18 01:29:24 -07:00
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
if (!outgoing.empty()) send_msgs(outgoing);
|
2015-07-16 22:55:50 -07:00
|
|
|
|
2015-07-31 23:26:38 -07:00
|
|
|
INFO("client: CONNECTED to server " << conn.stream().getPeerIP() << " port "
|
|
|
|
|
<< conn.stream().getPeerPort());
|
2015-07-31 20:32:52 -07:00
|
|
|
return true;
|
2015-07-17 22:39:36 -07:00
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
bool DispatcherBase::ServerHandshake(
|
2015-07-29 20:33:26 -07:00
|
|
|
NetworkConnection& conn,
|
2015-07-31 20:32:52 -07:00
|
|
|
std::function<std::shared_ptr<Message>()> get_msg,
|
|
|
|
|
std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs) {
|
2015-07-29 20:33:26 -07:00
|
|
|
// Wait for the client to send us a hello.
|
|
|
|
|
auto msg = get_msg();
|
|
|
|
|
if (!msg) {
|
|
|
|
|
DEBUG("server: client disconnected before sending hello");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
if (!msg->Is(Message::kClientHello)) {
|
|
|
|
|
DEBUG("server: client initial message was not client hello");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check that the client requested version is not too high.
|
|
|
|
|
unsigned int proto_rev = msg->id();
|
|
|
|
|
if (proto_rev > 0x0300) {
|
|
|
|
|
DEBUG("server: client requested proto > 0x0300");
|
2015-07-31 20:32:52 -07:00
|
|
|
send_msgs(Message::ProtoUnsup());
|
2015-07-29 20:33:26 -07:00
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (proto_rev >= 0x0300) conn.set_remote_id(msg->str());
|
|
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
// Set the proto version to the client requested version
|
2015-07-29 20:33:26 -07:00
|
|
|
conn.set_proto_rev(proto_rev);
|
2015-07-31 20:32:52 -07:00
|
|
|
|
|
|
|
|
// Send initial set of assignments
|
2015-07-29 20:33:26 -07:00
|
|
|
NetworkConnection::Outgoing outgoing;
|
|
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
// Start with server hello. TODO: initial connection flag
|
2015-07-29 20:33:26 -07:00
|
|
|
if (proto_rev >= 0x0300) {
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
2015-07-31 20:32:52 -07:00
|
|
|
outgoing.emplace_back(Message::ServerHello(0u, m_identity));
|
2015-07-29 20:33:26 -07:00
|
|
|
}
|
|
|
|
|
|
2015-07-31 20:32:52 -07:00
|
|
|
// Get snapshot of initial assignments
|
2015-08-01 11:15:04 -07:00
|
|
|
m_storage.GetInitialAssignments(conn, &outgoing);
|
2015-07-31 20:32:52 -07:00
|
|
|
|
|
|
|
|
// Finish with server hello done
|
|
|
|
|
outgoing.emplace_back(Message::ServerHelloDone());
|
|
|
|
|
|
|
|
|
|
// Batch transmit
|
|
|
|
|
DEBUG("server: sending initial assignments");
|
|
|
|
|
send_msgs(outgoing);
|
|
|
|
|
|
2015-07-29 20:33:26 -07:00
|
|
|
// In proto rev 3.0 and later, the handshake concludes with a client hello
|
|
|
|
|
// done message, so we can batch the assigns before marking the connection
|
|
|
|
|
// active. In pre-3.0, we need to just immediately mark it active and hand
|
|
|
|
|
// off control to the dispatcher to assign them as they arrive.
|
|
|
|
|
if (proto_rev >= 0x0300) {
|
|
|
|
|
// receive client initial assignments
|
|
|
|
|
std::vector<std::shared_ptr<Message>> incoming;
|
2015-07-31 20:32:52 -07:00
|
|
|
msg = get_msg();
|
2015-07-29 20:33:26 -07:00
|
|
|
for (;;) {
|
|
|
|
|
if (!msg) {
|
|
|
|
|
// disconnected, retry
|
2015-07-31 20:32:52 -07:00
|
|
|
DEBUG("server: disconnected waiting for initial entries");
|
2015-07-29 20:33:26 -07:00
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
if (msg->Is(Message::kClientHelloDone)) break;
|
|
|
|
|
if (!msg->Is(Message::kEntryAssign)) {
|
|
|
|
|
// unexpected message
|
2015-07-31 22:41:26 -07:00
|
|
|
DEBUG("server: received message (" << msg->type() << ") other than entry assignment during initial handshake");
|
2015-07-29 20:33:26 -07:00
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
incoming.push_back(msg);
|
|
|
|
|
// get the next message (blocks)
|
|
|
|
|
msg = get_msg();
|
|
|
|
|
}
|
2015-08-01 11:15:04 -07:00
|
|
|
for (auto& msg : incoming) m_storage.ProcessIncoming(msg, &conn);
|
2015-07-29 20:33:26 -07:00
|
|
|
}
|
2015-07-31 20:32:52 -07:00
|
|
|
|
2015-07-31 23:26:38 -07:00
|
|
|
INFO("server: client CONNECTED: " << conn.stream().getPeerIP() << " port "
|
|
|
|
|
<< conn.stream().getPeerPort());
|
2015-07-29 20:33:26 -07:00
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-02 00:33:41 -07:00
|
|
|
void DispatcherBase::ClientReconnect(unsigned int proto_rev) {
|
2015-07-17 22:39:36 -07:00
|
|
|
if (m_server) return;
|
|
|
|
|
{
|
2015-07-31 23:14:26 -07:00
|
|
|
std::lock_guard<std::mutex> lock(m_user_mutex);
|
2015-07-31 20:32:52 -07:00
|
|
|
m_reconnect_proto_rev = proto_rev;
|
2015-07-17 22:39:36 -07:00
|
|
|
m_do_reconnect = true;
|
2015-07-16 22:55:50 -07:00
|
|
|
}
|
2015-07-17 22:39:36 -07:00
|
|
|
m_reconnect_cv.notify_one();
|
|
|
|
|
}
|