From 44821c3e3c95bc6422284e5cfec2a4fef8494c63 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Tue, 22 Dec 2015 08:23:44 -0800 Subject: [PATCH] Change how Dispatcher is shut down. The previous use of a timeout resulting in thread detach instead of thread join resulted in a race condition on Mac between destruction and thread closeout. This commit removes the detach functionality and uses dup2() to on Linux/Mac and connecting to itself on Windows to try to ensure accept() exits. --- src/Dispatcher.cpp | 59 +++------------------------------- src/Dispatcher.h | 7 ---- src/tcpsockets/TCPAcceptor.cpp | 53 ++++++++++++++++++++++++++++-- src/tcpsockets/TCPAcceptor.h | 3 +- 4 files changed, 57 insertions(+), 65 deletions(-) diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 88484d17c1..2b8fcb2aaa 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -54,11 +54,6 @@ void DispatcherBase::StartServer(StringRef persist_filename, if (m_active) return; m_active = true; } - { - std::lock_guard lock(m_shutdown_mutex); - m_dispatch_shutdown = false; - m_clientserver_shutdown = false; - } m_server = true; m_persist_filename = persist_filename; m_server_acceptor = std::move(acceptor); @@ -92,11 +87,6 @@ void DispatcherBase::StartClient( if (m_active) return; m_active = true; } - { - std::lock_guard lock(m_shutdown_mutex); - m_dispatch_shutdown = false; - m_clientserver_shutdown = false; - } m_server = false; using namespace std::placeholders; @@ -121,26 +111,8 @@ void DispatcherBase::Stop() { if (m_server_acceptor) m_server_acceptor->shutdown(); // join threads, with timeout - if (m_dispatch_thread.joinable()) { - std::unique_lock lock(m_shutdown_mutex); - auto timeout_time = - std::chrono::steady_clock::now() + std::chrono::seconds(1); - if (m_dispatch_shutdown_cv.wait_until(lock, timeout_time, - [&] { return m_dispatch_shutdown; })) - m_dispatch_thread.join(); - else - m_dispatch_thread.detach(); // timed out, detach it - } - if (m_clientserver_thread.joinable()) { - std::unique_lock lock(m_shutdown_mutex); - auto timeout_time = - std::chrono::steady_clock::now() + std::chrono::seconds(1); - if (m_clientserver_shutdown_cv.wait_until( - lock, timeout_time, [&] { return m_clientserver_shutdown; })) - m_clientserver_thread.join(); - else - m_clientserver_thread.detach(); // timed out, detach it - } + if (m_dispatch_thread.joinable()) m_dispatch_thread.join(); + if (m_clientserver_thread.joinable()) m_clientserver_thread.join(); std::vector> conns; { @@ -258,13 +230,6 @@ void DispatcherBase::DispatchThreadMain() { } } } - - // use condition variable to signal thread shutdown - { - std::lock_guard lock(m_shutdown_mutex); - m_dispatch_shutdown = true; - m_dispatch_shutdown_cv.notify_one(); - } } void DispatcherBase::QueueOutgoing(std::shared_ptr msg, @@ -284,14 +249,15 @@ void DispatcherBase::QueueOutgoing(std::shared_ptr msg, void DispatcherBase::ServerThreadMain() { if (m_server_acceptor->start() != 0) { m_active = false; - goto done; + return; } while (m_active) { auto stream = m_server_acceptor->accept(); if (!stream) { m_active = false; - break; + return; } + if (!m_active) return; DEBUG("server: client connection from " << stream->getPeerIP() << " port " << stream->getPeerPort()); @@ -319,14 +285,6 @@ void DispatcherBase::ServerThreadMain() { conn->Start(); } } - -done: - // use condition variable to signal thread shutdown - { - std::lock_guard lock(m_shutdown_mutex); - m_clientserver_shutdown = true; - m_clientserver_shutdown_cv.notify_one(); - } } void DispatcherBase::ClientThreadMain( @@ -359,13 +317,6 @@ void DispatcherBase::ClientThreadMain( m_do_reconnect = false; m_reconnect_cv.wait(lock, [&] { return !m_active || m_do_reconnect; }); } - - // use condition variable to signal thread shutdown - { - std::lock_guard lock(m_shutdown_mutex); - m_clientserver_shutdown = true; - m_clientserver_shutdown_cv.notify_one(); - } } bool DispatcherBase::ClientHandshake( diff --git a/src/Dispatcher.h b/src/Dispatcher.h index 3a199c2dc2..26f5e76328 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -99,13 +99,6 @@ class DispatcherBase { std::condition_variable m_reconnect_cv; unsigned int m_reconnect_proto_rev = 0x0300; bool m_do_reconnect = true; - - // Condition variables for shutdown - std::mutex m_shutdown_mutex; - std::condition_variable m_dispatch_shutdown_cv; - std::condition_variable m_clientserver_shutdown_cv; - bool m_dispatch_shutdown = false; - bool m_clientserver_shutdown = false; }; class Dispatcher : public DispatcherBase { diff --git a/src/tcpsockets/TCPAcceptor.cpp b/src/tcpsockets/TCPAcceptor.cpp index 04a995dca4..3b7e16e865 100644 --- a/src/tcpsockets/TCPAcceptor.cpp +++ b/src/tcpsockets/TCPAcceptor.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #endif #include "llvm/SmallString.h" @@ -44,8 +45,8 @@ TCPAcceptor::TCPAcceptor(int port, const char* address) : m_lsd(0), m_port(port), m_address(address), - m_listening(false), - m_shutdown(false) { + m_listening(false) { + m_shutdown = false; #ifdef _WIN32 WSAData wsaData; WORD wVersionRequested = MAKEWORD(2, 2); @@ -111,13 +112,51 @@ void TCPAcceptor::shutdown() { m_shutdown = true; #ifdef _WIN32 ::shutdown(m_lsd, SD_BOTH); + + // this is ugly, but the easiest way to do this + // force wakeup of accept() with a non-blocking connect to ourselves + struct sockaddr_in address; + + std::memset(&address, 0, sizeof(address)); + address.sin_family = PF_INET; + llvm::SmallString<128> addr_copy; + if (m_address.size() > 0) + addr_copy = m_address; + else + addr_copy = "127.0.0.1"; + addr_copy.push_back('\0'); + int size = sizeof(address); + if (WSAStringToAddress(addr_copy.data(), PF_INET, nullptr, + (struct sockaddr*)&address, &size) != 0) + return; + address.sin_port = htons(m_port); + + fd_set sdset; + struct timeval tv; + int result = -1, valopt, sd = socket(AF_INET, SOCK_STREAM, 0); + + // Set socket to non-blocking + u_long mode = 1; + ioctlsocket(sd, FIONBIO, &mode); + + // Try to connect + ::connect(sd, (struct sockaddr*)&address, sizeof(address)); + + // Close + ::closesocket(sd); + #else ::shutdown(m_lsd, SHUT_RDWR); + int nullfd = ::open("/dev/null", O_RDONLY); + if (nullfd >= 0) { + ::dup2(nullfd, m_lsd); + ::close(nullfd); + } #endif } std::unique_ptr TCPAcceptor::accept() { - if (!m_listening) return nullptr; + if (!m_listening || m_shutdown) return nullptr; struct sockaddr_in address; #ifdef _WIN32 @@ -131,5 +170,13 @@ std::unique_ptr TCPAcceptor::accept() { if (!m_shutdown) ERROR("accept() failed: " << SocketStrerror()); return nullptr; } + if (m_shutdown) { +#ifdef _WIN32 + closesocket(sd); +#else + close(sd); +#endif + return nullptr; + } return std::unique_ptr(new TCPStream(sd, &address)); } diff --git a/src/tcpsockets/TCPAcceptor.h b/src/tcpsockets/TCPAcceptor.h index 98c1604fce..d6a6ccc366 100644 --- a/src/tcpsockets/TCPAcceptor.h +++ b/src/tcpsockets/TCPAcceptor.h @@ -24,6 +24,7 @@ #ifndef TCPSOCKETS_TCPACCEPTOR_H_ #define TCPSOCKETS_TCPACCEPTOR_H_ +#include #include #include @@ -35,7 +36,7 @@ class TCPAcceptor : public NetworkAcceptor { int m_port; std::string m_address; bool m_listening; - bool m_shutdown; + std::atomic_bool m_shutdown; public: TCPAcceptor(int port, const char* address);