Automatically reconnect client.

Also clean up some condition variable handling to make it more robust.
This commit is contained in:
Peter Johnson
2015-07-31 23:14:26 -07:00
parent a9af4589ac
commit 3a71acec52
3 changed files with 19 additions and 17 deletions

View File

@@ -22,7 +22,7 @@ Dispatcher::Dispatcher()
: m_server(false),
m_do_flush(false),
m_reconnect_proto_rev(0x0300),
m_do_reconnect(false) {
m_do_reconnect(true) {
m_active = false;
m_update_rate = 100;
}
@@ -123,6 +123,7 @@ void Dispatcher::DispatchThreadMain() {
auto timeout_time = std::chrono::steady_clock::now();
int count = 0;
std::unique_lock<std::mutex> flush_lock(m_flush_mutex);
while (m_active) {
// handle loop taking too long
auto start = std::chrono::steady_clock::now();
@@ -131,11 +132,9 @@ void Dispatcher::DispatchThreadMain() {
// wait for periodic or when flushed
timeout_time += std::chrono::milliseconds(m_update_rate);
std::unique_lock<std::mutex> flush_lock(m_flush_mutex);
m_reconnect_cv.wait_until(flush_lock, timeout_time,
[&] { return !m_active || m_do_flush; });
m_do_flush = false;
flush_lock.unlock();
if (!m_active) break; // in case we were woken up to terminate
if (++count > 10) {
@@ -147,12 +146,20 @@ void Dispatcher::DispatchThreadMain() {
connections.resize(0);
{
std::lock_guard<std::mutex> user_lock(m_user_mutex);
bool reconnect = false;
for (auto& conn : m_connections) {
if (conn.net->state() == NetworkConnection::kActive) {
connections.push_back(ConnectionRef());
connections.back().net = conn.net.get();
connections.back().outgoing.swap(conn.outgoing);
}
if (!m_server && conn.net->state() == NetworkConnection::kDead)
reconnect = true;
}
// reconnect if we disconnected (and a reconnect is not in progress)
if (reconnect && !m_do_reconnect) {
m_do_reconnect = true;
m_reconnect_cv.notify_one();
}
}
@@ -212,7 +219,6 @@ void Dispatcher::ServerThreadMain(const char* listen_address,
}
void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) {
unsigned int proto_rev = 0x0300;
while (m_active) {
// sleep between retries
std::this_thread::sleep_for(std::chrono::milliseconds(500));
@@ -223,6 +229,7 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) {
if (!stream) continue; // keep retrying
DEBUG("client connected");
std::unique_lock<std::mutex> lock(m_user_mutex);
using namespace std::placeholders;
Storage& storage = Storage::GetInstance();
std::unique_ptr<NetworkConnection> conn_unique(new NetworkConnection(
@@ -231,20 +238,14 @@ void Dispatcher::ClientThreadMain(const char* server_name, unsigned int port) {
std::bind(&Storage::GetEntryType, &storage, _1),
std::bind(&Storage::ProcessIncoming, &storage, _1, _2, _3)));
auto conn = conn_unique.get();
{
std::lock_guard<std::mutex> lock(m_user_mutex);
m_connections.resize(0); // disconnect any current
m_connections.emplace_back(std::move(conn_unique));
}
conn->set_proto_rev(proto_rev);
m_connections.resize(0); // disconnect any current
m_connections.emplace_back(std::move(conn_unique));
conn->set_proto_rev(m_reconnect_proto_rev);
conn->Start();
// block until told to reconnect
std::unique_lock<std::mutex> lock(m_reconnect_mutex);
m_reconnect_cv.wait(lock, [&] { return m_do_reconnect; });
proto_rev = m_reconnect_proto_rev;
m_do_reconnect = false;
lock.unlock();
m_reconnect_cv.wait(lock, [&] { return m_do_reconnect; });
}
}
@@ -400,7 +401,7 @@ bool Dispatcher::ServerHandshake(
void Dispatcher::ClientReconnect(unsigned int proto_rev) {
if (m_server) return;
{
std::lock_guard<std::mutex> lock(m_reconnect_mutex);
std::lock_guard<std::mutex> lock(m_user_mutex);
m_reconnect_proto_rev = proto_rev;
m_do_reconnect = true;
}