Change how Dispatcher is shut down.

The previous use of a timeout resulting in thread detach instead of thread
join resulted in a race condition on Mac between destruction and thread
closeout.  This commit removes the detach functionality and uses dup2() to
on Linux/Mac and connecting to itself on Windows to try to ensure accept()
exits.
This commit is contained in:
Peter Johnson
2015-12-22 08:23:44 -08:00
parent 2540f102b0
commit 44821c3e3c
4 changed files with 57 additions and 65 deletions

View File

@@ -54,11 +54,6 @@ void DispatcherBase::StartServer(StringRef persist_filename,
if (m_active) return;
m_active = true;
}
{
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
m_dispatch_shutdown = false;
m_clientserver_shutdown = false;
}
m_server = true;
m_persist_filename = persist_filename;
m_server_acceptor = std::move(acceptor);
@@ -92,11 +87,6 @@ void DispatcherBase::StartClient(
if (m_active) return;
m_active = true;
}
{
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
m_dispatch_shutdown = false;
m_clientserver_shutdown = false;
}
m_server = false;
using namespace std::placeholders;
@@ -121,26 +111,8 @@ void DispatcherBase::Stop() {
if (m_server_acceptor) m_server_acceptor->shutdown();
// join threads, with timeout
if (m_dispatch_thread.joinable()) {
std::unique_lock<std::mutex> lock(m_shutdown_mutex);
auto timeout_time =
std::chrono::steady_clock::now() + std::chrono::seconds(1);
if (m_dispatch_shutdown_cv.wait_until(lock, timeout_time,
[&] { return m_dispatch_shutdown; }))
m_dispatch_thread.join();
else
m_dispatch_thread.detach(); // timed out, detach it
}
if (m_clientserver_thread.joinable()) {
std::unique_lock<std::mutex> lock(m_shutdown_mutex);
auto timeout_time =
std::chrono::steady_clock::now() + std::chrono::seconds(1);
if (m_clientserver_shutdown_cv.wait_until(
lock, timeout_time, [&] { return m_clientserver_shutdown; }))
m_clientserver_thread.join();
else
m_clientserver_thread.detach(); // timed out, detach it
}
if (m_dispatch_thread.joinable()) m_dispatch_thread.join();
if (m_clientserver_thread.joinable()) m_clientserver_thread.join();
std::vector<std::shared_ptr<NetworkConnection>> conns;
{
@@ -258,13 +230,6 @@ void DispatcherBase::DispatchThreadMain() {
}
}
}
// use condition variable to signal thread shutdown
{
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
m_dispatch_shutdown = true;
m_dispatch_shutdown_cv.notify_one();
}
}
void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
@@ -284,14 +249,15 @@ void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
void DispatcherBase::ServerThreadMain() {
if (m_server_acceptor->start() != 0) {
m_active = false;
goto done;
return;
}
while (m_active) {
auto stream = m_server_acceptor->accept();
if (!stream) {
m_active = false;
break;
return;
}
if (!m_active) return;
DEBUG("server: client connection from " << stream->getPeerIP() << " port "
<< stream->getPeerPort());
@@ -319,14 +285,6 @@ void DispatcherBase::ServerThreadMain() {
conn->Start();
}
}
done:
// use condition variable to signal thread shutdown
{
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
m_clientserver_shutdown = true;
m_clientserver_shutdown_cv.notify_one();
}
}
void DispatcherBase::ClientThreadMain(
@@ -359,13 +317,6 @@ void DispatcherBase::ClientThreadMain(
m_do_reconnect = false;
m_reconnect_cv.wait(lock, [&] { return !m_active || m_do_reconnect; });
}
// use condition variable to signal thread shutdown
{
std::lock_guard<std::mutex> lock(m_shutdown_mutex);
m_clientserver_shutdown = true;
m_clientserver_shutdown_cv.notify_one();
}
}
bool DispatcherBase::ClientHandshake(

View File

@@ -99,13 +99,6 @@ class DispatcherBase {
std::condition_variable m_reconnect_cv;
unsigned int m_reconnect_proto_rev = 0x0300;
bool m_do_reconnect = true;
// Condition variables for shutdown
std::mutex m_shutdown_mutex;
std::condition_variable m_dispatch_shutdown_cv;
std::condition_variable m_clientserver_shutdown_cv;
bool m_dispatch_shutdown = false;
bool m_clientserver_shutdown = false;
};
class Dispatcher : public DispatcherBase {

View File

@@ -32,6 +32,7 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#endif
#include "llvm/SmallString.h"
@@ -44,8 +45,8 @@ TCPAcceptor::TCPAcceptor(int port, const char* address)
: m_lsd(0),
m_port(port),
m_address(address),
m_listening(false),
m_shutdown(false) {
m_listening(false) {
m_shutdown = false;
#ifdef _WIN32
WSAData wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
@@ -111,13 +112,51 @@ void TCPAcceptor::shutdown() {
m_shutdown = true;
#ifdef _WIN32
::shutdown(m_lsd, SD_BOTH);
// this is ugly, but the easiest way to do this
// force wakeup of accept() with a non-blocking connect to ourselves
struct sockaddr_in address;
std::memset(&address, 0, sizeof(address));
address.sin_family = PF_INET;
llvm::SmallString<128> addr_copy;
if (m_address.size() > 0)
addr_copy = m_address;
else
addr_copy = "127.0.0.1";
addr_copy.push_back('\0');
int size = sizeof(address);
if (WSAStringToAddress(addr_copy.data(), PF_INET, nullptr,
(struct sockaddr*)&address, &size) != 0)
return;
address.sin_port = htons(m_port);
fd_set sdset;
struct timeval tv;
int result = -1, valopt, sd = socket(AF_INET, SOCK_STREAM, 0);
// Set socket to non-blocking
u_long mode = 1;
ioctlsocket(sd, FIONBIO, &mode);
// Try to connect
::connect(sd, (struct sockaddr*)&address, sizeof(address));
// Close
::closesocket(sd);
#else
::shutdown(m_lsd, SHUT_RDWR);
int nullfd = ::open("/dev/null", O_RDONLY);
if (nullfd >= 0) {
::dup2(nullfd, m_lsd);
::close(nullfd);
}
#endif
}
std::unique_ptr<NetworkStream> TCPAcceptor::accept() {
if (!m_listening) return nullptr;
if (!m_listening || m_shutdown) return nullptr;
struct sockaddr_in address;
#ifdef _WIN32
@@ -131,5 +170,13 @@ std::unique_ptr<NetworkStream> TCPAcceptor::accept() {
if (!m_shutdown) ERROR("accept() failed: " << SocketStrerror());
return nullptr;
}
if (m_shutdown) {
#ifdef _WIN32
closesocket(sd);
#else
close(sd);
#endif
return nullptr;
}
return std::unique_ptr<NetworkStream>(new TCPStream(sd, &address));
}

View File

@@ -24,6 +24,7 @@
#ifndef TCPSOCKETS_TCPACCEPTOR_H_
#define TCPSOCKETS_TCPACCEPTOR_H_
#include <atomic>
#include <memory>
#include <string>
@@ -35,7 +36,7 @@ class TCPAcceptor : public NetworkAcceptor {
int m_port;
std::string m_address;
bool m_listening;
bool m_shutdown;
std::atomic_bool m_shutdown;
public:
TCPAcceptor(int port, const char* address);