From dd0e3e4abbd33ef72321dd69c8fd73c181ad4e0c Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Fri, 9 Oct 2015 23:50:01 -0700 Subject: [PATCH] Implement join with timeout (and detach). Under certain situations (notably JNI shutdown), it's possible to get deadlock when using thread join(). To avoid this, implement a timeout. Normally we try to simply join the thread, but if it times out, we detach the thread instead. --- src/Dispatcher.cpp | 58 ++++++++++++++++++++++++++++++++++++--- src/Dispatcher.h | 7 +++++ src/NetworkConnection.cpp | 47 ++++++++++++++++++++++++++++--- src/NetworkConnection.h | 7 +++++ src/Notifier.cpp | 28 +++++++++++++++++-- src/Notifier.h | 3 ++ src/RpcServer.cpp | 25 +++++++++++++++-- src/RpcServer.h | 3 ++ 8 files changed, 166 insertions(+), 12 deletions(-) diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 409b857b17..7700204cb0 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -54,6 +54,11 @@ void DispatcherBase::StartServer(StringRef persist_filename, if (m_active) return; m_active = true; } + { + std::lock_guard lock(m_shutdown_mutex); + m_dispatch_shutdown = false; + m_clientserver_shutdown = false; + } m_server = true; m_persist_filename = persist_filename; m_server_acceptor = std::move(acceptor); @@ -87,6 +92,11 @@ void DispatcherBase::StartClient( if (m_active) return; m_active = true; } + { + std::lock_guard lock(m_shutdown_mutex); + m_dispatch_shutdown = false; + m_clientserver_shutdown = false; + } m_server = false; using namespace std::placeholders; @@ -110,9 +120,27 @@ void DispatcherBase::Stop() { // wake up server thread by shutting down the socket if (m_server_acceptor) m_server_acceptor->shutdown(); - // join threads - if (m_dispatch_thread.joinable()) m_dispatch_thread.join(); - if (m_clientserver_thread.joinable()) m_clientserver_thread.join(); + // join threads, with timeout + if (m_dispatch_thread.joinable()) { + std::unique_lock lock(m_shutdown_mutex); + auto timeout_time = + std::chrono::steady_clock::now() + std::chrono::seconds(1); + if (m_dispatch_shutdown_cv.wait_until(lock, timeout_time, + [&] { return m_dispatch_shutdown; })) + m_dispatch_thread.join(); + else + m_dispatch_thread.detach(); // timed out, detach it + } + if (m_clientserver_thread.joinable()) { + std::unique_lock lock(m_shutdown_mutex); + auto timeout_time = + std::chrono::steady_clock::now() + std::chrono::seconds(1); + if (m_clientserver_shutdown_cv.wait_until( + lock, timeout_time, [&] { return m_clientserver_shutdown; })) + m_clientserver_thread.join(); + else + m_clientserver_thread.detach(); // timed out, detach it + } std::vector> conns; { @@ -229,6 +257,13 @@ void DispatcherBase::DispatchThreadMain() { } } } + + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_dispatch_shutdown = true; + m_dispatch_shutdown_cv.notify_one(); + } } void DispatcherBase::QueueOutgoing(std::shared_ptr msg, @@ -248,7 +283,7 @@ void DispatcherBase::QueueOutgoing(std::shared_ptr msg, void DispatcherBase::ServerThreadMain() { if (m_server_acceptor->start() != 0) { m_active = false; - return; + goto done; } while (m_active) { auto stream = m_server_acceptor->accept(); @@ -274,6 +309,14 @@ void DispatcherBase::ServerThreadMain() { conn->Start(); } } + +done: + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_clientserver_shutdown = true; + m_clientserver_shutdown_cv.notify_one(); + } } void DispatcherBase::ClientThreadMain( @@ -306,6 +349,13 @@ void DispatcherBase::ClientThreadMain( m_do_reconnect = false; m_reconnect_cv.wait(lock, [&] { return !m_active || m_do_reconnect; }); } + + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_clientserver_shutdown = true; + m_clientserver_shutdown_cv.notify_one(); + } } bool DispatcherBase::ClientHandshake( diff --git a/src/Dispatcher.h b/src/Dispatcher.h index 26f5e76328..3a199c2dc2 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -99,6 +99,13 @@ class DispatcherBase { std::condition_variable m_reconnect_cv; unsigned int m_reconnect_proto_rev = 0x0300; bool m_do_reconnect = true; + + // Condition variables for shutdown + std::mutex m_shutdown_mutex; + std::condition_variable m_dispatch_shutdown_cv; + std::condition_variable m_clientserver_shutdown_cv; + bool m_dispatch_shutdown = false; + bool m_clientserver_shutdown = false; }; class Dispatcher : public DispatcherBase { diff --git a/src/NetworkConnection.cpp b/src/NetworkConnection.cpp index 350a1342ee..96286a572b 100644 --- a/src/NetworkConnection.cpp +++ b/src/NetworkConnection.cpp @@ -42,6 +42,12 @@ void NetworkConnection::Start() { m_state = static_cast(kInit); // clear queue while (!m_outgoing.empty()) m_outgoing.pop(); + // reset shutdown flags + { + std::lock_guard lock(m_shutdown_mutex); + m_read_shutdown = false; + m_write_shutdown = false; + } // start threads m_write_thread = std::thread(&NetworkConnection::WriteThreadMain, this); m_read_thread = std::thread(&NetworkConnection::ReadThreadMain, this); @@ -54,9 +60,27 @@ void NetworkConnection::Stop() { if (m_stream) m_stream->close(); // send an empty outgoing message set so the write thread terminates m_outgoing.push(Outgoing()); - // wait for threads to terminate - if (m_write_thread.joinable()) m_write_thread.join(); - if (m_read_thread.joinable()) m_read_thread.join(); + // wait for threads to terminate, with timeout + if (m_write_thread.joinable()) { + std::unique_lock lock(m_shutdown_mutex); + auto timeout_time = + std::chrono::steady_clock::now() + std::chrono::milliseconds(200); + if (m_write_shutdown_cv.wait_until(lock, timeout_time, + [&] { return m_write_shutdown; })) + m_write_thread.join(); + else + m_write_thread.detach(); // timed out, detach it + } + if (m_read_thread.joinable()) { + std::unique_lock lock(m_shutdown_mutex); + auto timeout_time = + std::chrono::steady_clock::now() + std::chrono::milliseconds(200); + if (m_read_shutdown_cv.wait_until(lock, timeout_time, + [&] { return m_read_shutdown; })) + m_read_thread.join(); + else + m_read_thread.detach(); // timed out, detach it + } // clear queue while (!m_outgoing.empty()) m_outgoing.pop(); } @@ -95,7 +119,7 @@ void NetworkConnection::ReadThreadMain() { })) { m_state = static_cast(kDead); m_active = false; - return; + goto done; } m_state = static_cast(kActive); @@ -122,6 +146,14 @@ void NetworkConnection::ReadThreadMain() { m_state = static_cast(kDead); m_active = false; m_outgoing.push(Outgoing()); // also kill write thread + +done: + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_read_shutdown = true; + m_read_shutdown_cv.notify_one(); + } } void NetworkConnection::WriteThreadMain() { @@ -153,6 +185,13 @@ void NetworkConnection::WriteThreadMain() { m_state = static_cast(kDead); m_active = false; if (m_stream) m_stream->close(); // also kill read thread + + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_write_shutdown = true; + m_write_shutdown_cv.notify_one(); + } } void NetworkConnection::QueueOutgoing(std::shared_ptr msg) { diff --git a/src/NetworkConnection.h b/src/NetworkConnection.h index bbfb083e43..2f1073cabb 100644 --- a/src/NetworkConnection.h +++ b/src/NetworkConnection.h @@ -101,6 +101,13 @@ class NetworkConnection { std::mutex m_pending_mutex; Outgoing m_pending_outgoing; std::vector> m_pending_update; + + // Condition variables for shutdown + std::mutex m_shutdown_mutex; + std::condition_variable m_read_shutdown_cv; + std::condition_variable m_write_shutdown_cv; + bool m_read_shutdown = false; + bool m_write_shutdown = false; }; } // namespace nt diff --git a/src/Notifier.cpp b/src/Notifier.cpp index 34d311c067..f1db1b659d 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -29,6 +29,10 @@ void Notifier::Start() { if (m_active) return; m_active = true; } + { + std::lock_guard lock(m_shutdown_mutex); + m_shutdown = false; + } m_thread = std::thread(&Notifier::ThreadMain, this); } @@ -36,7 +40,17 @@ void Notifier::Stop() { m_active = false; // send notification so the thread terminates m_cond.notify_one(); - if (m_thread.joinable()) m_thread.join(); + if (m_thread.joinable()) { + // join with timeout + std::unique_lock lock(m_shutdown_mutex); + auto timeout_time = + std::chrono::steady_clock::now() + std::chrono::seconds(1); + if (m_shutdown_cv.wait_until(lock, timeout_time, + [&] { return m_shutdown; })) + m_thread.join(); + else + m_thread.detach(); // timed out, detach it + } } void Notifier::ThreadMain() { @@ -44,11 +58,12 @@ void Notifier::ThreadMain() { while (m_active) { while (m_entry_notifications.empty() && m_conn_notifications.empty()) { m_cond.wait(lock); - if (!m_active) return; + if (!m_active) goto done; } // Entry notifications while (!m_entry_notifications.empty()) { + if (!m_active) goto done; auto item = std::move(m_entry_notifications.front()); m_entry_notifications.pop(); @@ -95,6 +110,7 @@ void Notifier::ThreadMain() { // Connection notifications while (!m_conn_notifications.empty()) { + if (!m_active) goto done; auto item = std::move(m_conn_notifications.front()); m_conn_notifications.pop(); @@ -117,6 +133,14 @@ void Notifier::ThreadMain() { } } } + +done: + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_shutdown = true; + m_shutdown_cv.notify_one(); + } } unsigned int Notifier::AddEntryListener(StringRef prefix, diff --git a/src/Notifier.h b/src/Notifier.h index 952259752e..487680cf58 100644 --- a/src/Notifier.h +++ b/src/Notifier.h @@ -101,6 +101,9 @@ class Notifier { std::queue m_conn_notifications; std::thread m_thread; + std::mutex m_shutdown_mutex; + std::condition_variable m_shutdown_cv; + bool m_shutdown = false; ATOMIC_STATIC_DECL(Notifier) static bool s_destroyed; diff --git a/src/RpcServer.cpp b/src/RpcServer.cpp index 654cb61765..43d37de1ae 100644 --- a/src/RpcServer.cpp +++ b/src/RpcServer.cpp @@ -31,6 +31,10 @@ void RpcServer::Start() { if (m_active) return; m_active = true; } + { + std::lock_guard lock(m_shutdown_mutex); + m_shutdown = false; + } m_thread = std::thread(&RpcServer::ThreadMain, this); } @@ -39,7 +43,15 @@ void RpcServer::Stop() { if (m_thread.joinable()) { // send notification so the thread terminates m_call_cond.notify_one(); - m_thread.join(); + // join with timeout + std::unique_lock lock(m_shutdown_mutex); + auto timeout_time = + std::chrono::steady_clock::now() + std::chrono::seconds(1); + if (m_shutdown_cv.wait_until(lock, timeout_time, + [&] { return m_shutdown; })) + m_thread.join(); + else + m_thread.detach(); // timed out, detach it } } @@ -97,10 +109,11 @@ void RpcServer::ThreadMain() { while (m_active) { while (m_call_queue.empty()) { m_call_cond.wait(lock); - if (!m_active) return; + if (!m_active) goto done; } while (!m_call_queue.empty()) { + if (!m_active) goto done; auto item = std::move(m_call_queue.front()); m_call_queue.pop(); @@ -117,4 +130,12 @@ void RpcServer::ThreadMain() { lock.lock(); } } + +done: + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_shutdown = true; + m_shutdown_cv.notify_one(); + } } diff --git a/src/RpcServer.h b/src/RpcServer.h index 73cc799693..726034d9b0 100644 --- a/src/RpcServer.h +++ b/src/RpcServer.h @@ -79,6 +79,9 @@ class RpcServer { m_response_map; std::thread m_thread; + std::mutex m_shutdown_mutex; + std::condition_variable m_shutdown_cv; + bool m_shutdown = false; ATOMIC_STATIC_DECL(RpcServer) };