mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-23 01:21:42 +00:00
Implement join with timeout (and detach).
Under certain situations (notably JNI shutdown), it's possible to get deadlock when using thread join(). To avoid this, implement a timeout. Normally we try to simply join the thread, but if it times out, we detach the thread instead.
This commit is contained in:
@@ -54,6 +54,11 @@ void DispatcherBase::StartServer(StringRef persist_filename,
|
||||
if (m_active) return;
|
||||
m_active = true;
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_dispatch_shutdown = false;
|
||||
m_clientserver_shutdown = false;
|
||||
}
|
||||
m_server = true;
|
||||
m_persist_filename = persist_filename;
|
||||
m_server_acceptor = std::move(acceptor);
|
||||
@@ -87,6 +92,11 @@ void DispatcherBase::StartClient(
|
||||
if (m_active) return;
|
||||
m_active = true;
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_dispatch_shutdown = false;
|
||||
m_clientserver_shutdown = false;
|
||||
}
|
||||
m_server = false;
|
||||
|
||||
using namespace std::placeholders;
|
||||
@@ -110,9 +120,27 @@ void DispatcherBase::Stop() {
|
||||
// wake up server thread by shutting down the socket
|
||||
if (m_server_acceptor) m_server_acceptor->shutdown();
|
||||
|
||||
// join threads
|
||||
if (m_dispatch_thread.joinable()) m_dispatch_thread.join();
|
||||
if (m_clientserver_thread.joinable()) m_clientserver_thread.join();
|
||||
// join threads, with timeout
|
||||
if (m_dispatch_thread.joinable()) {
|
||||
std::unique_lock<std::mutex> lock(m_shutdown_mutex);
|
||||
auto timeout_time =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds(1);
|
||||
if (m_dispatch_shutdown_cv.wait_until(lock, timeout_time,
|
||||
[&] { return m_dispatch_shutdown; }))
|
||||
m_dispatch_thread.join();
|
||||
else
|
||||
m_dispatch_thread.detach(); // timed out, detach it
|
||||
}
|
||||
if (m_clientserver_thread.joinable()) {
|
||||
std::unique_lock<std::mutex> lock(m_shutdown_mutex);
|
||||
auto timeout_time =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds(1);
|
||||
if (m_clientserver_shutdown_cv.wait_until(
|
||||
lock, timeout_time, [&] { return m_clientserver_shutdown; }))
|
||||
m_clientserver_thread.join();
|
||||
else
|
||||
m_clientserver_thread.detach(); // timed out, detach it
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<NetworkConnection>> conns;
|
||||
{
|
||||
@@ -229,6 +257,13 @@ void DispatcherBase::DispatchThreadMain() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// use condition variable to signal thread shutdown
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_dispatch_shutdown = true;
|
||||
m_dispatch_shutdown_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
|
||||
@@ -248,7 +283,7 @@ void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
|
||||
void DispatcherBase::ServerThreadMain() {
|
||||
if (m_server_acceptor->start() != 0) {
|
||||
m_active = false;
|
||||
return;
|
||||
goto done;
|
||||
}
|
||||
while (m_active) {
|
||||
auto stream = m_server_acceptor->accept();
|
||||
@@ -274,6 +309,14 @@ void DispatcherBase::ServerThreadMain() {
|
||||
conn->Start();
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
// use condition variable to signal thread shutdown
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_clientserver_shutdown = true;
|
||||
m_clientserver_shutdown_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void DispatcherBase::ClientThreadMain(
|
||||
@@ -306,6 +349,13 @@ void DispatcherBase::ClientThreadMain(
|
||||
m_do_reconnect = false;
|
||||
m_reconnect_cv.wait(lock, [&] { return !m_active || m_do_reconnect; });
|
||||
}
|
||||
|
||||
// use condition variable to signal thread shutdown
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_clientserver_shutdown = true;
|
||||
m_clientserver_shutdown_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
bool DispatcherBase::ClientHandshake(
|
||||
|
||||
@@ -99,6 +99,13 @@ class DispatcherBase {
|
||||
std::condition_variable m_reconnect_cv;
|
||||
unsigned int m_reconnect_proto_rev = 0x0300;
|
||||
bool m_do_reconnect = true;
|
||||
|
||||
// Condition variables for shutdown
|
||||
std::mutex m_shutdown_mutex;
|
||||
std::condition_variable m_dispatch_shutdown_cv;
|
||||
std::condition_variable m_clientserver_shutdown_cv;
|
||||
bool m_dispatch_shutdown = false;
|
||||
bool m_clientserver_shutdown = false;
|
||||
};
|
||||
|
||||
class Dispatcher : public DispatcherBase {
|
||||
|
||||
@@ -42,6 +42,12 @@ void NetworkConnection::Start() {
|
||||
m_state = static_cast<int>(kInit);
|
||||
// clear queue
|
||||
while (!m_outgoing.empty()) m_outgoing.pop();
|
||||
// reset shutdown flags
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_read_shutdown = false;
|
||||
m_write_shutdown = false;
|
||||
}
|
||||
// start threads
|
||||
m_write_thread = std::thread(&NetworkConnection::WriteThreadMain, this);
|
||||
m_read_thread = std::thread(&NetworkConnection::ReadThreadMain, this);
|
||||
@@ -54,9 +60,27 @@ void NetworkConnection::Stop() {
|
||||
if (m_stream) m_stream->close();
|
||||
// send an empty outgoing message set so the write thread terminates
|
||||
m_outgoing.push(Outgoing());
|
||||
// wait for threads to terminate
|
||||
if (m_write_thread.joinable()) m_write_thread.join();
|
||||
if (m_read_thread.joinable()) m_read_thread.join();
|
||||
// wait for threads to terminate, with timeout
|
||||
if (m_write_thread.joinable()) {
|
||||
std::unique_lock<std::mutex> lock(m_shutdown_mutex);
|
||||
auto timeout_time =
|
||||
std::chrono::steady_clock::now() + std::chrono::milliseconds(200);
|
||||
if (m_write_shutdown_cv.wait_until(lock, timeout_time,
|
||||
[&] { return m_write_shutdown; }))
|
||||
m_write_thread.join();
|
||||
else
|
||||
m_write_thread.detach(); // timed out, detach it
|
||||
}
|
||||
if (m_read_thread.joinable()) {
|
||||
std::unique_lock<std::mutex> lock(m_shutdown_mutex);
|
||||
auto timeout_time =
|
||||
std::chrono::steady_clock::now() + std::chrono::milliseconds(200);
|
||||
if (m_read_shutdown_cv.wait_until(lock, timeout_time,
|
||||
[&] { return m_read_shutdown; }))
|
||||
m_read_thread.join();
|
||||
else
|
||||
m_read_thread.detach(); // timed out, detach it
|
||||
}
|
||||
// clear queue
|
||||
while (!m_outgoing.empty()) m_outgoing.pop();
|
||||
}
|
||||
@@ -95,7 +119,7 @@ void NetworkConnection::ReadThreadMain() {
|
||||
})) {
|
||||
m_state = static_cast<int>(kDead);
|
||||
m_active = false;
|
||||
return;
|
||||
goto done;
|
||||
}
|
||||
|
||||
m_state = static_cast<int>(kActive);
|
||||
@@ -122,6 +146,14 @@ void NetworkConnection::ReadThreadMain() {
|
||||
m_state = static_cast<int>(kDead);
|
||||
m_active = false;
|
||||
m_outgoing.push(Outgoing()); // also kill write thread
|
||||
|
||||
done:
|
||||
// use condition variable to signal thread shutdown
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_read_shutdown = true;
|
||||
m_read_shutdown_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkConnection::WriteThreadMain() {
|
||||
@@ -153,6 +185,13 @@ void NetworkConnection::WriteThreadMain() {
|
||||
m_state = static_cast<int>(kDead);
|
||||
m_active = false;
|
||||
if (m_stream) m_stream->close(); // also kill read thread
|
||||
|
||||
// use condition variable to signal thread shutdown
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_write_shutdown = true;
|
||||
m_write_shutdown_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkConnection::QueueOutgoing(std::shared_ptr<Message> msg) {
|
||||
|
||||
@@ -101,6 +101,13 @@ class NetworkConnection {
|
||||
std::mutex m_pending_mutex;
|
||||
Outgoing m_pending_outgoing;
|
||||
std::vector<std::pair<std::size_t, std::size_t>> m_pending_update;
|
||||
|
||||
// Condition variables for shutdown
|
||||
std::mutex m_shutdown_mutex;
|
||||
std::condition_variable m_read_shutdown_cv;
|
||||
std::condition_variable m_write_shutdown_cv;
|
||||
bool m_read_shutdown = false;
|
||||
bool m_write_shutdown = false;
|
||||
};
|
||||
|
||||
} // namespace nt
|
||||
|
||||
@@ -29,6 +29,10 @@ void Notifier::Start() {
|
||||
if (m_active) return;
|
||||
m_active = true;
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_shutdown = false;
|
||||
}
|
||||
m_thread = std::thread(&Notifier::ThreadMain, this);
|
||||
}
|
||||
|
||||
@@ -36,7 +40,17 @@ void Notifier::Stop() {
|
||||
m_active = false;
|
||||
// send notification so the thread terminates
|
||||
m_cond.notify_one();
|
||||
if (m_thread.joinable()) m_thread.join();
|
||||
if (m_thread.joinable()) {
|
||||
// join with timeout
|
||||
std::unique_lock<std::mutex> lock(m_shutdown_mutex);
|
||||
auto timeout_time =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds(1);
|
||||
if (m_shutdown_cv.wait_until(lock, timeout_time,
|
||||
[&] { return m_shutdown; }))
|
||||
m_thread.join();
|
||||
else
|
||||
m_thread.detach(); // timed out, detach it
|
||||
}
|
||||
}
|
||||
|
||||
void Notifier::ThreadMain() {
|
||||
@@ -44,11 +58,12 @@ void Notifier::ThreadMain() {
|
||||
while (m_active) {
|
||||
while (m_entry_notifications.empty() && m_conn_notifications.empty()) {
|
||||
m_cond.wait(lock);
|
||||
if (!m_active) return;
|
||||
if (!m_active) goto done;
|
||||
}
|
||||
|
||||
// Entry notifications
|
||||
while (!m_entry_notifications.empty()) {
|
||||
if (!m_active) goto done;
|
||||
auto item = std::move(m_entry_notifications.front());
|
||||
m_entry_notifications.pop();
|
||||
|
||||
@@ -95,6 +110,7 @@ void Notifier::ThreadMain() {
|
||||
|
||||
// Connection notifications
|
||||
while (!m_conn_notifications.empty()) {
|
||||
if (!m_active) goto done;
|
||||
auto item = std::move(m_conn_notifications.front());
|
||||
m_conn_notifications.pop();
|
||||
|
||||
@@ -117,6 +133,14 @@ void Notifier::ThreadMain() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
// use condition variable to signal thread shutdown
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_shutdown = true;
|
||||
m_shutdown_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
unsigned int Notifier::AddEntryListener(StringRef prefix,
|
||||
|
||||
@@ -101,6 +101,9 @@ class Notifier {
|
||||
std::queue<ConnectionNotification> m_conn_notifications;
|
||||
|
||||
std::thread m_thread;
|
||||
std::mutex m_shutdown_mutex;
|
||||
std::condition_variable m_shutdown_cv;
|
||||
bool m_shutdown = false;
|
||||
|
||||
ATOMIC_STATIC_DECL(Notifier)
|
||||
static bool s_destroyed;
|
||||
|
||||
@@ -31,6 +31,10 @@ void RpcServer::Start() {
|
||||
if (m_active) return;
|
||||
m_active = true;
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_shutdown = false;
|
||||
}
|
||||
m_thread = std::thread(&RpcServer::ThreadMain, this);
|
||||
}
|
||||
|
||||
@@ -39,7 +43,15 @@ void RpcServer::Stop() {
|
||||
if (m_thread.joinable()) {
|
||||
// send notification so the thread terminates
|
||||
m_call_cond.notify_one();
|
||||
m_thread.join();
|
||||
// join with timeout
|
||||
std::unique_lock<std::mutex> lock(m_shutdown_mutex);
|
||||
auto timeout_time =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds(1);
|
||||
if (m_shutdown_cv.wait_until(lock, timeout_time,
|
||||
[&] { return m_shutdown; }))
|
||||
m_thread.join();
|
||||
else
|
||||
m_thread.detach(); // timed out, detach it
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,10 +109,11 @@ void RpcServer::ThreadMain() {
|
||||
while (m_active) {
|
||||
while (m_call_queue.empty()) {
|
||||
m_call_cond.wait(lock);
|
||||
if (!m_active) return;
|
||||
if (!m_active) goto done;
|
||||
}
|
||||
|
||||
while (!m_call_queue.empty()) {
|
||||
if (!m_active) goto done;
|
||||
auto item = std::move(m_call_queue.front());
|
||||
m_call_queue.pop();
|
||||
|
||||
@@ -117,4 +130,12 @@ void RpcServer::ThreadMain() {
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
// use condition variable to signal thread shutdown
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
|
||||
m_shutdown = true;
|
||||
m_shutdown_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,6 +79,9 @@ class RpcServer {
|
||||
m_response_map;
|
||||
|
||||
std::thread m_thread;
|
||||
std::mutex m_shutdown_mutex;
|
||||
std::condition_variable m_shutdown_cv;
|
||||
bool m_shutdown = false;
|
||||
|
||||
ATOMIC_STATIC_DECL(RpcServer)
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user