From 412e8034debc43ddc344d3b5d53afdc813bf5cb8 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Fri, 17 Jul 2015 22:39:36 -0700 Subject: [PATCH] Start implementing client and server. --- src/Dispatcher.cpp | 174 ++++++++++++++++++++++++++++++++++++++------- src/Dispatcher.h | 38 ++++++++-- 2 files changed, 181 insertions(+), 31 deletions(-) diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 901f0f9da9..187e2ff06b 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -12,6 +12,8 @@ using namespace nt; +#define DEBUG(str) puts(str) + std::unique_ptr Dispatcher::m_instance; static NT_Type GetEntryType(unsigned int id) { @@ -19,34 +21,67 @@ static NT_Type GetEntryType(unsigned int id) { return NT_UNASSIGNED; } -Dispatcher::Dispatcher() : m_active(false) {} +Dispatcher::Dispatcher() + : m_server(false), + m_active(false), + m_update_rate(100), + m_do_flush(false), + m_do_reconnect(false) {} Dispatcher::~Dispatcher() { Stop(); } void Dispatcher::StartServer(const char* listen_address, unsigned int port) { - if (m_active) return; - m_active = true; + { + std::lock_guard lock(m_user_mutex); + if (m_active) return; + m_active = true; + } + m_server = true; m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this); m_clientserver_thread = std::thread(&Dispatcher::ServerThreadMain, this, listen_address, port); } void Dispatcher::StartClient(const char* server_name, unsigned int port) { - if (m_active) return; - m_active = true; + { + std::lock_guard lock(m_user_mutex); + if (m_active) return; + m_active = true; + } + m_server = false; m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this); m_clientserver_thread = std::thread(&Dispatcher::ClientThreadMain, this, server_name, port); } void Dispatcher::Stop() { - m_active = false; + { + std::lock_guard lock(m_user_mutex); + m_active = false; + + // close all connections + for (auto& conn : m_connections) conn.net->Stop(); + } + + // wake up dispatch thread with a flush + m_flush_cv.notify_one(); + + // wake up client thread with a reconnect + ClientReconnect(); + + // wake up server thread by shutting down the socket + if (m_server_acceptor) m_server_acceptor->shutdown(); + + // join threads if (m_dispatch_thread.joinable()) m_dispatch_thread.join(); if (m_clientserver_thread.joinable()) m_clientserver_thread.join(); } void Dispatcher::SetUpdateRate(double interval) { - // TODO + // don't allow update rates faster than 100 ms + if (interval < 0.1) + interval = 0.1; + m_update_rate = interval * 1000; } void Dispatcher::SetIdentity(llvm::StringRef name) { @@ -54,51 +89,107 @@ void Dispatcher::SetIdentity(llvm::StringRef name) { m_identity = name; } +void Dispatcher::Flush() { + auto now = std::chrono::steady_clock::now(); + { + std::lock_guard lock(m_flush_mutex); + // don't allow flushes more often than every 100 ms + if ((now - m_last_flush) < std::chrono::milliseconds(100)) + return; + m_last_flush = now; + m_do_flush = true; + } + m_flush_cv.notify_one(); +} + void Dispatcher::DispatchThreadMain() { - // TODO + auto timeout_time = std::chrono::steady_clock::now(); + int count = 0; + while (m_active) { + // handle loop taking too long + auto start = std::chrono::steady_clock::now(); + if (start > timeout_time) + timeout_time = start; + + // wait for periodic or when flushed + timeout_time += std::chrono::milliseconds(m_update_rate); + std::unique_lock lock(m_flush_mutex); + m_reconnect_cv.wait_until(lock, timeout_time, + [&] { return !m_active || m_do_flush; }); + m_do_flush = false; + lock.unlock(); + if (!m_active) break; // in case we were woken up to terminate + + if (++count > 10) { + DEBUG("dispatch running"); + count = 0; + } + } } void Dispatcher::ServerThreadMain(const char* listen_address, unsigned int port) { - TCPAcceptor acceptor(static_cast(port), listen_address); - if (acceptor.start() != 0) { + m_server_acceptor.reset( + new TCPAcceptor(static_cast(port), listen_address)); + if (m_server_acceptor->start() != 0) { m_active = false; return; } while (m_active) { - auto stream = acceptor.accept(); + auto stream = m_server_acceptor->accept(); if (!stream) { m_active = false; break; } + DEBUG("server got a connection"); + // add to connections list + Connection conn; + conn.net.reset(new NetworkConnection(std::move(stream), GetEntryType)); + conn.net->Start(); + AddConnection(std::move(conn)); } } void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { - std::string self_id; - { - std::lock_guard lock(m_user_mutex); - self_id = m_identity; - } unsigned int proto_rev = 0x0300; - std::unique_ptr conn; while (m_active) { + // get identity + std::string self_id; + { + std::lock_guard lock(m_user_mutex); + self_id = m_identity; + } + + // sleep between retries + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + // try to connect (with timeout) + DEBUG("client trying to connect"); auto stream = TCPConnector::connect(server_name, static_cast(port), 1); if (!stream) continue; // keep retrying + DEBUG("client connected"); - conn.reset(new NetworkConnection(std::move(stream), GetEntryType)); - conn->set_proto_rev(proto_rev); + Connection conn; + conn.net.reset(new NetworkConnection(std::move(stream), GetEntryType)); + conn.net->set_proto_rev(proto_rev); + conn.net->Start(); // send client hello - conn->outgoing().push( + DEBUG("client sending hello"); + conn.net->outgoing().push( NetworkConnection::Outgoing{Message::ClientHello(self_id)}); // wait for response - auto resp = conn->incoming().pop(); + auto msg = conn.net->incoming().pop(); + if (!msg) { + // disconnected, retry + DEBUG("client disconnected waiting for first response"); + proto_rev = 0x0300; + continue; + } - if (resp->Is(Message::kProtoUnsup)) { + if (msg->Is(Message::kProtoUnsup)) { // reconnect with lower protocol (if possible) if (proto_rev <= 0x0200) { // no more options, abort (but keep trying to connect) @@ -109,14 +200,45 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { continue; } - if (!resp->Is(Message::kServerHello)) { - // shouldn't happen; disconnect but keep trying to connect - proto_rev = 0x0300; - continue; + if (proto_rev == 0x0300) { + // should be server hello; if not, disconnect, but keep trying to connect + if (!msg->Is(Message::kServerHello)) continue; + conn.remote_id = msg->str(); + // get the next message (blocks) + msg = conn.net->incoming().pop(); + } + + while (true) { + if (!msg) { + // disconnected, retry + DEBUG("client disconnected waiting for initial entries"); + proto_rev = 0x0300; + continue; + } + if (msg->Is(Message::kServerHelloDone)) break; } // add to connections list (the dispatcher thread will handle from here) + AddConnection(std::move(conn)); // block until told to reconnect + std::unique_lock lock(m_reconnect_mutex); + m_reconnect_cv.wait(lock, [&] { return m_do_reconnect; }); + m_do_reconnect = false; + lock.unlock(); } } + +void Dispatcher::ClientReconnect() { + if (m_server) return; + { + std::lock_guard lock(m_reconnect_mutex); + m_do_reconnect = true; + } + m_reconnect_cv.notify_one(); +} + +void Dispatcher::AddConnection(Connection&& conn) { + std::lock_guard lock(m_user_mutex); + m_connections.push_back(std::move(conn)); +} diff --git a/src/Dispatcher.h b/src/Dispatcher.h index bb2fe1f28e..c37fe00531 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -9,6 +9,8 @@ #define NT_DISPATCHER_H_ #include +#include +#include #include #include #include @@ -18,6 +20,8 @@ #include "NetworkConnection.h" +class TCPAcceptor; + namespace nt { class Dispatcher { @@ -33,6 +37,9 @@ class Dispatcher { void Stop(); void SetUpdateRate(double interval); void SetIdentity(llvm::StringRef name); + void Flush(); + + bool active() const { return m_active; } Dispatcher(const Dispatcher&) = delete; Dispatcher& operator=(const Dispatcher&) = delete; @@ -44,22 +51,43 @@ class Dispatcher { void ServerThreadMain(const char* listen_address, unsigned int port); void ClientThreadMain(const char* server_name, unsigned int port); + void ClientReconnect(); + struct Connection { enum State { }; State state; - std::unique_ptr conn; + std::string remote_id; + std::unique_ptr net; }; + + void AddConnection(Connection&& conn); + + bool m_server; std::thread m_dispatch_thread; std::thread m_clientserver_thread; + std::thread m_notifier_thread; - // Mutex protected + std::shared_ptr m_server_acceptor; + + // Mutex for user-accessible items + std::mutex m_user_mutex; std::vector m_connections; std::string m_identity; - std::mutex m_user_mutex; - std::atomic_bool m_active; - std::atomic_uint m_interval; + std::atomic_bool m_active; // set to false to terminate threads + std::atomic_uint m_update_rate; // periodic dispatch update rate, in ms + + // Condition variable for forced dispatch wakeup (flush) + std::mutex m_flush_mutex; + std::condition_variable m_flush_cv; + std::chrono::steady_clock::time_point m_last_flush; + bool m_do_flush; + + // Condition variable for client reconnect + std::mutex m_reconnect_mutex; + std::condition_variable m_reconnect_cv; + bool m_do_reconnect; static std::unique_ptr m_instance; };