diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index ec74d4527f..26d6d2bdcb 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -22,7 +22,7 @@ Dispatcher::Dispatcher() : m_server(false), m_do_flush(false), m_reconnect_proto_rev(0x0300), - m_do_reconnect(false) { + m_do_reconnect(true) { m_active = false; m_update_rate = 100; } @@ -123,6 +123,7 @@ void Dispatcher::DispatchThreadMain() { auto timeout_time = std::chrono::steady_clock::now(); int count = 0; + std::unique_lock flush_lock(m_flush_mutex); while (m_active) { // handle loop taking too long auto start = std::chrono::steady_clock::now(); @@ -131,11 +132,9 @@ void Dispatcher::DispatchThreadMain() { // wait for periodic or when flushed timeout_time += std::chrono::milliseconds(m_update_rate); - std::unique_lock flush_lock(m_flush_mutex); m_reconnect_cv.wait_until(flush_lock, timeout_time, [&] { return !m_active || m_do_flush; }); m_do_flush = false; - flush_lock.unlock(); if (!m_active) break; // in case we were woken up to terminate if (++count > 10) { @@ -147,12 +146,20 @@ void Dispatcher::DispatchThreadMain() { connections.resize(0); { std::lock_guard user_lock(m_user_mutex); + bool reconnect = false; for (auto& conn : m_connections) { if (conn.net->state() == NetworkConnection::kActive) { connections.push_back(ConnectionRef()); connections.back().net = conn.net.get(); connections.back().outgoing.swap(conn.outgoing); } + if (!m_server && conn.net->state() == NetworkConnection::kDead) + reconnect = true; + } + // reconnect if we disconnected (and a reconnect is not in progress) + if (reconnect && !m_do_reconnect) { + m_do_reconnect = true; + m_reconnect_cv.notify_one(); } } @@ -212,7 +219,6 @@ void Dispatcher::ServerThreadMain(const char* listen_address, } void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { - unsigned int proto_rev = 0x0300; while (m_active) { // sleep between retries std::this_thread::sleep_for(std::chrono::milliseconds(500)); @@ -223,6 +229,7 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { if (!stream) continue; // keep retrying DEBUG("client connected"); + std::unique_lock lock(m_user_mutex); using namespace std::placeholders; Storage& storage = Storage::GetInstance(); std::unique_ptr conn_unique(new NetworkConnection( @@ -231,20 +238,14 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) { std::bind(&Storage::GetEntryType, &storage, _1), std::bind(&Storage::ProcessIncoming, &storage, _1, _2, _3))); auto conn = conn_unique.get(); - { - std::lock_guard lock(m_user_mutex); - m_connections.resize(0); // disconnect any current - m_connections.emplace_back(std::move(conn_unique)); - } - conn->set_proto_rev(proto_rev); + m_connections.resize(0); // disconnect any current + m_connections.emplace_back(std::move(conn_unique)); + conn->set_proto_rev(m_reconnect_proto_rev); conn->Start(); // block until told to reconnect - std::unique_lock lock(m_reconnect_mutex); - m_reconnect_cv.wait(lock, [&] { return m_do_reconnect; }); - proto_rev = m_reconnect_proto_rev; m_do_reconnect = false; - lock.unlock(); + m_reconnect_cv.wait(lock, [&] { return m_do_reconnect; }); } } @@ -400,7 +401,7 @@ bool Dispatcher::ServerHandshake( void Dispatcher::ClientReconnect(unsigned int proto_rev) { if (m_server) return; { - std::lock_guard lock(m_reconnect_mutex); + std::lock_guard lock(m_user_mutex); m_reconnect_proto_rev = proto_rev; m_do_reconnect = true; } diff --git a/src/Dispatcher.h b/src/Dispatcher.h index c9b027ee1d..ff3dabba59 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -96,8 +96,7 @@ class Dispatcher { std::chrono::steady_clock::time_point m_last_flush; bool m_do_flush; - // Condition variable for client reconnect - std::mutex m_reconnect_mutex; + // Condition variable for client reconnect (uses user mutex) std::condition_variable m_reconnect_cv; unsigned int m_reconnect_proto_rev; bool m_do_reconnect; diff --git a/src/NetworkConnection.cpp b/src/NetworkConnection.cpp index cab818952a..dd725d366d 100644 --- a/src/NetworkConnection.cpp +++ b/src/NetworkConnection.cpp @@ -100,6 +100,7 @@ void NetworkConnection::ReadThreadMain() { } m_process_incoming(std::move(msg), this, m_proto_rev); } + DEBUG3("read thread died"); m_state = static_cast(kDead); m_active = false; } @@ -123,6 +124,7 @@ void NetworkConnection::WriteThreadMain() { if (m_stream->send(encoder.data(), encoder.size(), &err) == 0) break; DEBUG4("sent " << encoder.size() << " bytes"); } + DEBUG3("write thread died"); m_state = static_cast(kDead); m_active = false; }