From 5b5e3ae6aa6d5495f62913b4ddd9c2fb70352fff Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Wed, 29 Jul 2015 20:33:26 -0700 Subject: [PATCH] Dispatcher: Start implementing processing of incoming messages. --- src/Dispatcher.cpp | 395 ++++++++++++++++++++++++++++++++++++++++++--- src/Dispatcher.h | 19 +-- 2 files changed, 380 insertions(+), 34 deletions(-) diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 149537c9e1..2b0c1be9cd 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -7,6 +7,9 @@ #include "Dispatcher.h" +#include +#include + #include "tcpsockets/TCPAcceptor.h" #include "tcpsockets/TCPConnector.h" @@ -24,7 +27,9 @@ Dispatcher::Dispatcher() m_update_rate = 100; } -Dispatcher::~Dispatcher() { Stop(); } +Dispatcher::~Dispatcher() { + Stop(); +} void Dispatcher::StartServer(const char* listen_address, unsigned int port) { { @@ -56,7 +61,7 @@ void Dispatcher::Stop() { m_active = false; // close all connections - for (auto& conn : m_connections) conn.net->Stop(); + for (auto& conn : m_connections) conn->Stop(); } // wake up dispatch thread with a flush @@ -99,6 +104,14 @@ void Dispatcher::Flush() { } void Dispatcher::DispatchThreadMain() { + Storage& storage = Storage::GetInstance(); + + // local copy of active m_connections + std::vector connections; + + // Outgoing messages for each remote (indexed the same as connections). + std::vector outgoing; + auto timeout_time = std::chrono::steady_clock::now(); int count = 0; while (m_active) { @@ -109,17 +122,265 @@ void Dispatcher::DispatchThreadMain() { // wait for periodic or when flushed timeout_time += std::chrono::milliseconds(m_update_rate); - std::unique_lock lock(m_flush_mutex); - m_reconnect_cv.wait_until(lock, timeout_time, + std::unique_lock flush_lock(m_flush_mutex); + m_reconnect_cv.wait_until(flush_lock, timeout_time, [&] { return !m_active || m_do_flush; }); m_do_flush = false; - lock.unlock(); + flush_lock.unlock(); if (!m_active) break; // in case we were woken up to terminate if (++count > 10) { DEBUG("dispatch running"); count = 0; } + + // clear outgoing + outgoing.resize(0); + + // Everything from this point forward needs to be treated as an atomic + // operation on idmap. The user code never needs access to this, so + // this is really a dispatcher-internal lock that only affects network + // side code. + std::unique_lock idmap_lock(m_idmap_mutex); + + // make a local copy of the connections list (so we don't hold the lock) + connections.resize(0); + { + std::lock_guard user_lock(m_user_mutex); + for (auto& conn : m_connections) { + if (conn->state() == NetworkConnection::kActive) + connections.push_back(conn.get()); + } + } + outgoing.resize(connections.size()); + + // grab local storage updates + Storage::UpdateMap updates; + bool delete_all; + storage.GetUpdates(&updates, &delete_all); + + // special handling of delete all operation: we ignore all in-flight + // messages + if (delete_all) { + // send it to all remotes + auto outmsg = Message::ClearEntries(); + for (auto& q : outgoing) q.push_back(outmsg); + + // empty all incoming messages + for (auto conn : connections) { + auto& incoming = conn->incoming(); + while (!incoming.empty()) incoming.pop(); + } + } + + // local entry updates + for (auto& update_entry : updates) { + auto update = update_entry.getValue(); + switch (update.kind) { + default: + break; + } + } + + // read all incoming messages + for (std::size_t i=0; iincoming(); + while (!incoming.empty()) { + auto msg = incoming.pop(); + if (!msg) continue; // should never happen, but just in case... + switch (msg->type()) { + case Message::kKeepAlive: + break; // ignore + case Message::kClientHello: + case Message::kProtoUnsup: + case Message::kServerHelloDone: + case Message::kServerHello: + case Message::kClientHelloDone: + // shouldn't get these, but ignore if we do + break; + case Message::kEntryAssign: { + unsigned int id = msg->id(); + std::shared_ptr entry; + if (m_server) { + // if we're a server, id=0xffff requests are requests for an id + // to be assigned, and we need to send the new assignment back to + // the sender as well as all other connections. + if (id == 0xffff) { + // see if it was already assigned; ignore if so. + if (!storage.FindEntry(msg->str())) continue; + + // create it locally + id = m_idmap.size(); + entry = storage.DispatchCreateEntry(msg->str(), msg->value(), + msg->flags()); + m_idmap.push_back(entry); + entry->set_id(id); + + // send the assignment to everyone (including the originator) + auto outmsg = Message::EntryAssign(msg->str(), id, + entry->seq_num().value(), + msg->value(), msg->flags()); + for (auto& q : outgoing) q.push_back(outmsg); + continue; + } + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore arbitrary entry assignments + // this can happen due to e.g. assignment to deleted entry + DEBUG("server: received assignment to unknown entry"); + continue; + } + entry = m_idmap[id]; + } else { + // clients simply accept new assignments + if (id == 0xffff) { + DEBUG("client: received entry assignment request?"); + continue; + } + if (id >= m_idmap.size()) m_idmap.resize(id+1); + entry = m_idmap[id]; + if (!entry) { + // create local + entry = storage.DispatchCreateEntry(msg->str(), msg->value(), + msg->flags()); + m_idmap[id] = entry; + entry->set_id(id); + continue; + } + } + + // common client and server handling + + // already exists; ignore if sequence number not higher than local + SequenceNumber seq_num(msg->seq_num_uid()); + if (seq_num <= entry->seq_num()) continue; + + // sanity check: name should match id + if (msg->str() != entry->name()) { + DEBUG("entry assignment for same id with different name?"); + continue; + } + + // update local + entry->set_value(msg->value()); + entry->set_seq_num(seq_num); + + // don't update flags from a <3.0 remote (not part of message) + if (conn->proto_rev() >= 0x0300) entry->set_flags(msg->flags()); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server) { + auto outmsg = + Message::EntryAssign(entry->name(), id, msg->seq_num_uid(), + msg->value(), entry->flags()); + for (std::size_t j = 0; j < connections.size(); ++j) { + if (j != i) outgoing[j].push_back(outmsg); + } + } + break; + } + case Message::kEntryUpdate: { + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore arbitrary entry updates; + // this can happen due to deleted entries + DEBUG("received update to unknown entry"); + continue; + } + auto& entry = m_idmap[id]; + + // ignore if sequence number not higher than local + SequenceNumber seq_num(msg->seq_num_uid()); + if (seq_num <= entry->seq_num()) continue; + + // update local + entry->set_value(msg->value()); + entry->set_seq_num(seq_num); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server) { + for (std::size_t j = 0; j < connections.size(); ++j) { + if (j != i) outgoing[j].push_back(msg); + } + } + break; + } + case Message::kFlagsUpdate: { + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore arbitrary entry updates; + // this can happen due to deleted entries + DEBUG("received flags update to unknown entry"); + continue; + } + auto& entry = m_idmap[id]; + + // update local + entry->set_flags(msg->flags()); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server) { + for (std::size_t j = 0; j < connections.size(); ++j) { + if (j != i) outgoing[j].push_back(msg); + } + } + break; + } + case Message::kEntryDelete: { + unsigned int id = msg->id(); + if (id >= m_idmap.size() || !m_idmap[id]) { + // ignore arbitrary entry updates; + // this can happen due to deleted entries + DEBUG("received delete to unknown entry"); + continue; + } + auto& entry = m_idmap[id]; + + // update local + storage.DispatchDeleteEntry(entry->name()); + entry.reset(); // delete it from idmap too + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server) { + for (std::size_t j = 0; j < connections.size(); ++j) { + if (j != i) outgoing[j].push_back(msg); + } + } + break; + } + case Message::kClearEntries: { + // update local + storage.DispatchDeleteAllEntries(); + m_idmap.resize(0); + + // broadcast to all other connections (note for client there won't + // be any other connections, so don't bother) + if (m_server) { + for (std::size_t j = 0; j < connections.size(); ++j) { + if (j != i) outgoing[j].push_back(msg); + } + } + break; + } + case Message::kExecuteRpc: + case Message::kRpcResponse: + // TODO + break; + default: + continue; + } + } + } + + idmap_lock.unlock(); + + // send outgoing messages + for (std::size_t i = 0; i < connections.size(); ++i) + connections[i]->outgoing().emplace(std::move(outgoing[i])); } } @@ -140,11 +401,10 @@ void Dispatcher::ServerThreadMain(const char* listen_address, DEBUG("server got a connection"); // add to connections list - Connection conn; - conn.net.reset(new NetworkConnection( + std::unique_ptr conn(new NetworkConnection( std::move(stream), [this](unsigned int id) { return GetEntryType(id); })); - conn.net->Start(); + conn->Start(); AddConnection(std::move(conn)); } } @@ -168,23 +428,22 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { if (!stream) continue; // keep retrying DEBUG("client connected"); - Connection conn; - conn.net.reset(new NetworkConnection( + std::unique_ptr conn(new NetworkConnection( std::move(stream), [this](unsigned int id) { return GetEntryType(id); })); - conn.net->set_proto_rev(proto_rev); - conn.net->Start(); + conn->set_proto_rev(proto_rev); + conn->Start(); // send client hello - DEBUG("client sending hello"); - conn.net->outgoing().push( + DEBUG("client: sending hello"); + conn->outgoing().push( NetworkConnection::Outgoing{Message::ClientHello(self_id)}); // wait for response - auto msg = conn.net->incoming().pop(); + auto msg = conn->incoming().pop(); if (!msg) { // disconnected, retry - DEBUG("client disconnected waiting for first response"); + DEBUG("client: server disconnected before first response"); proto_rev = 0x0300; continue; } @@ -202,10 +461,11 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { if (proto_rev >= 0x0300) { // should be server hello; if not, disconnect, but keep trying to connect + // TODO: do something with initial connection flag if (!msg->Is(Message::kServerHello)) continue; - conn.remote_id = msg->str(); + conn->set_remote_id(msg->str()); // get the next message (blocks) - msg = conn.net->incoming().pop(); + msg = conn->incoming().pop(); } // receive initial assignments @@ -213,20 +473,20 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { for (;;) { if (!msg) { // disconnected, retry - DEBUG("client disconnected waiting for initial entries"); + DEBUG("client: server disconnected during initial entries"); proto_rev = 0x0300; continue; } if (msg->Is(Message::kServerHelloDone)) break; if (!msg->Is(Message::kEntryAssign)) { // unexpected message - DEBUG("received message other than entry assignment during initial handshake"); + DEBUG("client: received message other than entry assignment during initial handshake"); proto_rev = 0x0300; continue; } incoming.push_back(msg); // get the next message (blocks) - msg = conn.net->incoming().pop(); + msg = conn->incoming().pop(); } // generate outgoing assignments @@ -236,7 +496,7 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { outgoing.push_back(Message::ClientHelloDone()); if (!outgoing.empty()) - conn.net->outgoing().push(std::move(outgoing)); + conn->outgoing().push(std::move(outgoing)); // add to connections list (the dispatcher thread will handle from here) AddConnection(std::move(conn)); @@ -249,6 +509,95 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { } } +bool Dispatcher::ServerHandshake( + NetworkConnection& conn, + std::function()> get_msg) { + // Wait for the client to send us a hello. + auto msg = get_msg(); + if (!msg) { + DEBUG("server: client disconnected before sending hello"); + return false; + } + if (!msg->Is(Message::kClientHello)) { + DEBUG("server: client initial message was not client hello"); + return false; + } + + // Check that the client requested version is not too high. + unsigned int proto_rev = msg->id(); + if (proto_rev > 0x0300) { + DEBUG("server: client requested proto > 0x0300"); + conn.outgoing().push(NetworkConnection::Outgoing{Message::ProtoUnsup()}); + return false; + } + + if (proto_rev >= 0x0300) conn.set_remote_id(msg->str()); + + // Set the proto version to the client requested version. + conn.set_proto_rev(proto_rev); +#if 0 + // We need to copy the ID map. This is inefficient, but is necessary + // because we need to get a "snapshot" of the current server state. The + // dispatch thread will create outgoing assignments as necessary as the idmap + // changes, but we don't want duplicate assignments or (worse) missing + // assignments by iterating one entry at a time. + IdMap id_map; + { + std::lock_guard lock(m_idmap_mutex); + id_map = m_idmap; + conn.set_state(NetworkConnection::kHandshake); + } +#endif + // send initial set of assignments + NetworkConnection::Outgoing outgoing; + + // Server hello. TODO: initial connection flag + if (proto_rev >= 0x0300) { + std::lock_guard lock(m_user_mutex); + outgoing.push_back(Message::ServerHello(0u, m_identity)); + } +#if 0 + Storage& storage = Storage::GetInstance(); + { + // take storage mutex as we must have a snapshot of the current values. + std::lock_guard lock(storage.mutex()); + std::lock_guard lock(m_idmap_mutex); + outgoing.push_back(Message::EntryAssign( + } +#endif + outgoing.push_back(Message::ServerHelloDone()); + + conn.outgoing().push(std::move(outgoing)); +#if 0 + // In proto rev 3.0 and later, the handshake concludes with a client hello + // done message, so we can batch the assigns before marking the connection + // active. In pre-3.0, we need to just immediately mark it active and hand + // off control to the dispatcher to assign them as they arrive. + if (proto_rev >= 0x0300) { + // receive client initial assignments + std::vector> incoming; + for (;;) { + if (!msg) { + // disconnected, retry + DEBUG("disconnected waiting for initial entries"); + return false; + } + if (msg->Is(Message::kClientHelloDone)) break; + if (!msg->Is(Message::kEntryAssign)) { + // unexpected message + DEBUG("received message other than entry assignment during initial handshake"); + return false; + } + incoming.push_back(msg); + // get the next message (blocks) + msg = get_msg(); + } + } +#endif + conn.set_state(NetworkConnection::kActive); + return true; +} + void Dispatcher::ClientReconnect() { if (m_server) return; { @@ -258,7 +607,7 @@ void Dispatcher::ClientReconnect() { m_reconnect_cv.notify_one(); } -void Dispatcher::AddConnection(Connection&& conn) { +void Dispatcher::AddConnection(std::unique_ptr conn) { std::lock_guard lock(m_user_mutex); m_connections.push_back(std::move(conn)); } diff --git a/src/Dispatcher.h b/src/Dispatcher.h index d25cf8c079..952e26c326 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -53,19 +54,14 @@ class Dispatcher { void ServerThreadMain(const char* listen_address, unsigned int port); void ClientThreadMain(const char* server_name, unsigned int port); + bool ServerHandshake(NetworkConnection& conn, + std::function()> get_msg); + void ClientReconnect(); NT_Type GetEntryType(unsigned int id) const; - struct Connection { - enum State { - }; - State state; - std::string remote_id; - std::unique_ptr net; - }; - - void AddConnection(Connection&& conn); + void AddConnection(std::unique_ptr conn); bool m_server; std::thread m_dispatch_thread; @@ -76,7 +72,7 @@ class Dispatcher { // Mutex for user-accessible items std::mutex m_user_mutex; - std::vector m_connections; + std::vector> m_connections; std::string m_identity; std::atomic_bool m_active; // set to false to terminate threads @@ -94,8 +90,9 @@ class Dispatcher { bool m_do_reconnect; // Map from integer id to storage entry. Id is 16-bit, so just use a vector. + typedef std::vector> IdMap; mutable std::mutex m_idmap_mutex; - std::vector> m_idmap; + IdMap m_idmap; ATOMIC_STATIC_DECL(Dispatcher) };