diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 8a9091934f..beea340e4e 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -18,8 +18,9 @@ using namespace nt; ATOMIC_STATIC_INIT(Dispatcher) -Dispatcher::Dispatcher() - : m_server(false), +Dispatcher::Dispatcher(Storage& storage) + : m_storage(storage), + m_server(false), m_do_flush(false), m_reconnect_proto_rev(0x0300), m_do_reconnect(true) { @@ -40,8 +41,8 @@ void Dispatcher::StartServer(const char* listen_address, unsigned int port) { m_server = true; using namespace std::placeholders; - Storage::GetInstance().SetOutgoing( - std::bind(&Dispatcher::QueueOutgoing, this, _1, _2, _3), m_server); + m_storage.SetOutgoing(std::bind(&Dispatcher::QueueOutgoing, this, _1, _2, _3), + m_server); m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this); m_clientserver_thread = @@ -57,8 +58,8 @@ void Dispatcher::StartClient(const char* server_name, unsigned int port) { m_server = false; using namespace std::placeholders; - Storage::GetInstance().SetOutgoing( - std::bind(&Dispatcher::QueueOutgoing, this, _1, _2, _3), m_server); + m_storage.SetOutgoing(std::bind(&Dispatcher::QueueOutgoing, this, _1, _2, _3), + m_server); m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this); m_clientserver_thread = @@ -206,12 +207,11 @@ void Dispatcher::ServerThreadMain(const char* listen_address, // add to connections list using namespace std::placeholders; - Storage& storage = Storage::GetInstance(); std::unique_ptr conn_unique(new NetworkConnection( std::move(stream), std::bind(&Dispatcher::ServerHandshake, this, _1, _2, _3), - std::bind(&Storage::GetEntryType, &storage, _1), - std::bind(&Storage::ProcessIncoming, &storage, _1, _2))); + std::bind(&Storage::GetEntryType, &m_storage, _1), + std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2))); auto conn = conn_unique.get(); { std::lock_guard lock(m_user_mutex); @@ -234,12 +234,11 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { std::unique_lock lock(m_user_mutex); using namespace std::placeholders; - Storage& storage = Storage::GetInstance(); std::unique_ptr conn_unique(new NetworkConnection( std::move(stream), std::bind(&Dispatcher::ClientHandshake, this, _1, _2, _3), - std::bind(&Storage::GetEntryType, &storage, _1), - std::bind(&Storage::ProcessIncoming, &storage, _1, _2))); + std::bind(&Storage::GetEntryType, &m_storage, _1), + std::bind(&Storage::ProcessIncoming, &m_storage, _1, _2))); auto conn = conn_unique.get(); m_connections.resize(0); // disconnect any current m_connections.emplace_back(std::move(conn_unique)); @@ -312,8 +311,7 @@ bool Dispatcher::ClientHandshake( // generate outgoing assignments NetworkConnection::Outgoing outgoing; - Storage::GetInstance().ApplyInitialAssignments(conn, incoming, new_server, - &outgoing); + m_storage.ApplyInitialAssignments(conn, incoming, new_server, &outgoing); if (conn.proto_rev() >= 0x0300) outgoing.emplace_back(Message::ClientHelloDone()); @@ -363,7 +361,7 @@ bool Dispatcher::ServerHandshake( } // Get snapshot of initial assignments - Storage::GetInstance().GetInitialAssignments(conn, &outgoing); + m_storage.GetInitialAssignments(conn, &outgoing); // Finish with server hello done outgoing.emplace_back(Message::ServerHelloDone()); @@ -396,8 +394,7 @@ bool Dispatcher::ServerHandshake( // get the next message (blocks) msg = get_msg(); } - Storage& storage = Storage::GetInstance(); - for (auto& msg : incoming) storage.ProcessIncoming(msg, &conn); + for (auto& msg : incoming) m_storage.ProcessIncoming(msg, &conn); } INFO("server: client CONNECTED: " << conn.stream().getPeerIP() << " port " diff --git a/src/Dispatcher.h b/src/Dispatcher.h index ff3dabba59..9548d06d03 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -28,6 +28,7 @@ class TCPAcceptor; namespace nt { class Dispatcher { + friend class DispatcherTest; public: static Dispatcher& GetInstance() { ATOMIC_STATIC(Dispatcher, instance); @@ -48,7 +49,8 @@ class Dispatcher { Dispatcher& operator=(const Dispatcher&) = delete; private: - Dispatcher(); + Dispatcher() : Dispatcher(Storage::GetInstance()) {} + Dispatcher(Storage& storage); void DispatchThreadMain(); void ServerThreadMain(const char* listen_address, unsigned int port); @@ -68,6 +70,7 @@ class Dispatcher { void QueueOutgoing(std::shared_ptr msg, NetworkConnection* only, NetworkConnection* except); + Storage& m_storage; bool m_server; std::thread m_dispatch_thread; std::thread m_clientserver_thread;