From 56f1481c2434017c7b6f2eac792ef8ea20a6ce47 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Thu, 16 Jul 2015 22:55:50 -0700 Subject: [PATCH] Add Dispatcher. --- src/Dispatcher.cpp | 122 +++++++++++++++++++++++++++++++++++++++++++++ src/Dispatcher.h | 69 +++++++++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 src/Dispatcher.cpp create mode 100644 src/Dispatcher.h diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp new file mode 100644 index 0000000000..c0ebe6abea --- /dev/null +++ b/src/Dispatcher.cpp @@ -0,0 +1,122 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "Dispatcher.h" + +#include "tcpsockets/TCPAcceptor.h" +#include "tcpsockets/TCPConnector.h" + +using namespace ntimpl; + +std::unique_ptr Dispatcher::m_instance; + +static NT_Type GetEntryType(unsigned int id) { + // TODO + return NT_UNASSIGNED; +} + +Dispatcher::Dispatcher() : m_active(false) {} + +Dispatcher::~Dispatcher() { Stop(); } + +void Dispatcher::StartServer(const char* listen_address, unsigned int port) { + if (m_active) return; + m_active = true; + m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this); + m_clientserver_thread = + std::thread(&Dispatcher::ServerThreadMain, this, listen_address, port); +} + +void Dispatcher::StartClient(const char* server_name, unsigned int port) { + if (m_active) return; + m_active = true; + m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this); + m_clientserver_thread = + std::thread(&Dispatcher::ClientThreadMain, this, server_name, port); +} + +void Dispatcher::Stop() { + m_active = false; + if (m_dispatch_thread.joinable()) m_dispatch_thread.join(); + if (m_clientserver_thread.joinable()) m_clientserver_thread.join(); +} + +void Dispatcher::SetUpdateRate(double interval) { + // TODO +} + +void Dispatcher::SetIdentity(llvm::StringRef name) { + std::lock_guard lock(m_user_mutex); + m_identity = name; +} + +void Dispatcher::DispatchThreadMain() { + // TODO +} + +void Dispatcher::ServerThreadMain(const char* listen_address, + unsigned int port) { + TCPAcceptor acceptor(static_cast(port), listen_address); + if (acceptor.start() != 0) { + m_active = false; + return; + } + while (m_active) { + auto stream = acceptor.accept(); + if (!stream) { + m_active = false; + break; + } + // add to connections list + } +} + +void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { + std::string self_id; + { + std::lock_guard lock(m_user_mutex); + self_id = m_identity; + } + unsigned int proto_rev = 0x0300; + std::unique_ptr conn; + while (m_active) { + // try to connect (with timeout) + auto stream = TCPConnector::connect(server_name, static_cast(port), 1); + if (!stream) continue; // keep retrying + + conn.reset(new NetworkConnection(std::move(stream), GetEntryType)); + conn->set_proto_rev(proto_rev); + + // send client hello + conn->outgoing().push( + NetworkConnection::Outgoing{Message::ClientHello(self_id)}); + + // wait for response + auto resp = conn->incoming().pop(); + + if (resp->Is(Message::kProtoUnsup)) { + // reconnect with lower protocol (if possible) + if (proto_rev <= 0x0200) { + // no more options, abort (but keep trying to connect) + proto_rev = 0x0300; + continue; + } + proto_rev = 0x0200; + continue; + } + + if (!resp->Is(Message::kServerHello)) { + // shouldn't happen; disconnect but keep trying to connect + proto_rev = 0x0300; + continue; + } + + // add to connections list (the dispatcher thread will handle from here) + + // block until told to reconnect + } +} diff --git a/src/Dispatcher.h b/src/Dispatcher.h new file mode 100644 index 0000000000..fdbf02d0db --- /dev/null +++ b/src/Dispatcher.h @@ -0,0 +1,69 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#ifndef NT_DISPATCHER_H_ +#define NT_DISPATCHER_H_ + +#include +#include +#include +#include +#include + +#include "llvm/StringRef.h" + +#include "NetworkConnection.h" + +namespace ntimpl { + +class Dispatcher { + public: + static Dispatcher& GetInstance() { + if (!m_instance) m_instance.reset(new Dispatcher); + return *m_instance; + } + ~Dispatcher(); + + void StartServer(const char* listen_address, unsigned int port); + void StartClient(const char* server_name, unsigned int port); + void Stop(); + void SetUpdateRate(double interval); + void SetIdentity(llvm::StringRef name); + + Dispatcher(const Dispatcher&) = delete; + Dispatcher& operator=(const Dispatcher&) = delete; + + private: + Dispatcher(); + + void DispatchThreadMain(); + void ServerThreadMain(const char* listen_address, unsigned int port); + void ClientThreadMain(const char* server_name, unsigned int port); + + struct Connection { + enum State { + }; + State state; + std::unique_ptr conn; + }; + std::thread m_dispatch_thread; + std::thread m_clientserver_thread; + + // Mutex protected + std::vector m_connections; + std::string m_identity; + std::mutex m_user_mutex; + + std::atomic_bool m_active; + std::atomic_uint m_interval; + + static std::unique_ptr m_instance; +}; + +} // namespace ntimpl + +#endif // NT_DISPATCHER_H_