[wpinet] uv: Stop creating handles when closing loop (#5102)

This prevents EventLoopRunner::Stop() from hanging in the case when
new handles are created after the async walk closes all the handles.
This commit is contained in:
Peter Johnson
2023-02-16 22:49:14 -08:00
committed by GitHub
parent 805c837a42
commit 8068369542
29 changed files with 300 additions and 82 deletions

View File

@@ -21,6 +21,9 @@ DsClient::DsClient(wpi::uv::Loop& loop, wpi::Logger& logger,
: m_logger{logger},
m_tcp{uv::Tcp::Create(loop)},
m_timer{uv::Timer::Create(loop)} {
if (!m_tcp || !m_timer) {
return;
}
m_tcp->end.connect([this] {
WPI_DEBUG4(m_logger, "DS connection closed");
clearIp();

View File

@@ -59,6 +59,7 @@ void EventLoopRunner::Stop() {
h.SetLoopClosing(true);
h.Close();
});
loop.SetClosing();
});
m_owner.Join();
}

View File

@@ -24,6 +24,9 @@ ParallelTcpConnector::ParallelTcpConnector(
m_reconnectRate{reconnectRate},
m_connected{std::move(connected)},
m_reconnectTimer{uv::Timer::Create(loop)} {
if (!m_reconnectTimer) {
return;
}
m_reconnectTimer->timeout.connect([this] {
if (!IsConnected()) {
WPI_DEBUG1(m_logger, "timed out, reconnecting");
@@ -86,6 +89,9 @@ void ParallelTcpConnector::Connect() {
// kick off parallel connection attempts
for (auto ai = &addrinfo; ai; ai = ai->ai_next) {
auto tcp = uv::Tcp::Create(m_loop);
if (!tcp) {
continue;
}
m_attempts.emplace_back(tcp);
auto connreq = std::make_shared<uv::TcpConnectReq>();

View File

@@ -49,6 +49,9 @@ void PortForwarder::Add(unsigned int port, std::string_view remoteHost,
unsigned int remotePort) {
m_impl->runner.ExecSync([&](uv::Loop& loop) {
auto server = uv::Tcp::Create(loop);
if (!server) {
return;
}
// bind to local port
server->Bind("", port);
@@ -71,6 +74,10 @@ void PortForwarder::Add(unsigned int port, std::string_view remoteHost,
client->SetData(connected);
auto remote = uv::Tcp::Create(loop);
if (!remote) {
client->Close();
return;
}
remote->error.connect(
[remotePtr = remote.get(),
clientWeak = std::weak_ptr<uv::Tcp>(client)](uv::Error err) {

View File

@@ -261,11 +261,12 @@ void WebSocket::StartClient(std::string_view uri, std::string_view host,
// Start handshake timer if a timeout was specified
if (options.handshakeTimeout != (uv::Timer::Time::max)()) {
auto timer = uv::Timer::Create(m_stream.GetLoopRef());
timer->timeout.connect(
[this]() { Terminate(1006, "connection timed out"); });
timer->Start(options.handshakeTimeout);
m_clientHandshake->timer = timer;
if (auto timer = uv::Timer::Create(m_stream.GetLoopRef())) {
timer->timeout.connect(
[this]() { Terminate(1006, "connection timed out"); });
timer->Start(options.handshakeTimeout);
m_clientHandshake->timer = timer;
}
}
}

View File

@@ -17,6 +17,9 @@ Async<>::~Async() noexcept {
}
std::shared_ptr<Async<>> Async<>::Create(const std::shared_ptr<Loop>& loop) {
if (loop->IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Async>(loop, private_init{});
int err = uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
Async& h = *static_cast<Async*>(handle->data);

View File

@@ -9,6 +9,9 @@
namespace wpi::uv {
std::shared_ptr<Check> Check::Create(Loop& loop) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Check>(private_init{});
int err = uv_check_init(loop.GetRaw(), h->GetRaw());
if (err < 0) {
@@ -20,6 +23,9 @@ std::shared_ptr<Check> Check::Create(Loop& loop) {
}
void Check::Start() {
if (IsLoopClosing()) {
return;
}
Invoke(&uv_check_start, GetRaw(), [](uv_check_t* handle) {
Check& h = *static_cast<Check*>(handle->data);
h.check();

View File

@@ -13,6 +13,9 @@
namespace wpi::uv {
std::shared_ptr<FsEvent> FsEvent::Create(Loop& loop) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<FsEvent>(private_init{});
int err = uv_fs_event_init(loop.GetRaw(), h->GetRaw());
if (err < 0) {

View File

@@ -18,6 +18,9 @@ GetAddrInfoReq::GetAddrInfoReq() {
void GetAddrInfo(Loop& loop, const std::shared_ptr<GetAddrInfoReq>& req,
std::string_view node, std::string_view service,
const addrinfo* hints) {
if (loop.IsClosing()) {
return;
}
SmallString<128> nodeStr{node};
SmallString<128> serviceStr{service};
int err = uv_getaddrinfo(

View File

@@ -15,6 +15,9 @@ GetNameInfoReq::GetNameInfoReq() {
void GetNameInfo(Loop& loop, const std::shared_ptr<GetNameInfoReq>& req,
const sockaddr& addr, int flags) {
if (loop.IsClosing()) {
return;
}
int err = uv_getnameinfo(
loop.GetRaw(), req->GetRaw(),
[](uv_getnameinfo_t* req, int status, const char* hostname,

View File

@@ -9,6 +9,9 @@
namespace wpi::uv {
std::shared_ptr<Idle> Idle::Create(Loop& loop) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Idle>(private_init{});
int err = uv_idle_init(loop.GetRaw(), h->GetRaw());
if (err < 0) {
@@ -20,6 +23,9 @@ std::shared_ptr<Idle> Idle::Create(Loop& loop) {
}
void Idle::Start() {
if (IsLoopClosing()) {
return;
}
Invoke(&uv_idle_start, GetRaw(), [](uv_idle_t* handle) {
Idle& h = *static_cast<Idle*>(handle->data);
h.idle();

View File

@@ -11,6 +11,9 @@ ConnectReq::ConnectReq() {
}
void NetworkStream::Listen(int backlog) {
if (IsLoopClosing()) {
return;
}
Invoke(&uv_listen, GetRawStream(), backlog,
[](uv_stream_t* handle, int status) {
auto& h = *static_cast<NetworkStream*>(handle->data);

View File

@@ -11,6 +11,9 @@
namespace wpi::uv {
std::shared_ptr<Pipe> Pipe::Create(Loop& loop, bool ipc) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Pipe>(private_init{});
int err = uv_pipe_init(loop.GetRaw(), h->GetRaw(), ipc ? 1 : 0);
if (err < 0) {
@@ -22,7 +25,7 @@ std::shared_ptr<Pipe> Pipe::Create(Loop& loop, bool ipc) {
}
void Pipe::Reuse(std::function<void()> callback, bool ipc) {
if (IsClosing()) {
if (IsLoopClosing() || IsClosing()) {
return;
}
if (!m_reuseData) {
@@ -69,6 +72,9 @@ void Pipe::Bind(std::string_view name) {
void Pipe::Connect(std::string_view name,
const std::shared_ptr<PipeConnectReq>& req) {
if (IsLoopClosing()) {
return;
}
SmallString<128> nameBuf{name};
uv_pipe_connect(req->GetRaw(), GetRaw(), nameBuf.c_str(),
[](uv_connect_t* req, int status) {

View File

@@ -9,6 +9,9 @@
namespace wpi::uv {
std::shared_ptr<Poll> Poll::Create(Loop& loop, int fd) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Poll>(private_init{});
int err = uv_poll_init(loop.GetRaw(), h->GetRaw(), fd);
if (err < 0) {
@@ -20,6 +23,9 @@ std::shared_ptr<Poll> Poll::Create(Loop& loop, int fd) {
}
std::shared_ptr<Poll> Poll::CreateSocket(Loop& loop, uv_os_sock_t sock) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Poll>(private_init{});
int err = uv_poll_init_socket(loop.GetRaw(), h->GetRaw(), sock);
if (err < 0) {
@@ -31,7 +37,7 @@ std::shared_ptr<Poll> Poll::CreateSocket(Loop& loop, uv_os_sock_t sock) {
}
void Poll::Reuse(int fd, std::function<void()> callback) {
if (IsClosing()) {
if (IsLoopClosing() || IsClosing()) {
return;
}
if (!m_reuseData) {
@@ -56,7 +62,7 @@ void Poll::Reuse(int fd, std::function<void()> callback) {
}
void Poll::ReuseSocket(uv_os_sock_t sock, std::function<void()> callback) {
if (IsClosing()) {
if (IsLoopClosing() || IsClosing()) {
return;
}
if (!m_reuseData) {
@@ -81,6 +87,9 @@ void Poll::ReuseSocket(uv_os_sock_t sock, std::function<void()> callback) {
}
void Poll::Start(int events) {
if (IsLoopClosing()) {
return;
}
Invoke(&uv_poll_start, GetRaw(), events,
[](uv_poll_t* handle, int status, int events) {
Poll& h = *static_cast<Poll*>(handle->data);

View File

@@ -9,6 +9,9 @@
namespace wpi::uv {
std::shared_ptr<Prepare> Prepare::Create(Loop& loop) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Prepare>(private_init{});
int err = uv_prepare_init(loop.GetRaw(), h->GetRaw());
if (err < 0) {
@@ -20,6 +23,9 @@ std::shared_ptr<Prepare> Prepare::Create(Loop& loop) {
}
void Prepare::Start() {
if (IsLoopClosing()) {
return;
}
Invoke(&uv_prepare_start, GetRaw(), [](uv_prepare_t* handle) {
Prepare& h = *static_cast<Prepare*>(handle->data);
h.prepare();

View File

@@ -13,6 +13,10 @@ namespace wpi::uv {
std::shared_ptr<Process> Process::SpawnArray(Loop& loop, std::string_view file,
std::span<const Option> options) {
if (loop.IsClosing()) {
return nullptr;
}
// convert Option array to libuv structure
uv_process_options_t coptions;

View File

@@ -9,6 +9,9 @@
namespace wpi::uv {
std::shared_ptr<Signal> Signal::Create(Loop& loop) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Signal>(private_init{});
int err = uv_signal_init(loop.GetRaw(), h->GetRaw());
if (err < 0) {
@@ -20,6 +23,9 @@ std::shared_ptr<Signal> Signal::Create(Loop& loop) {
}
void Signal::Start(int signum) {
if (IsLoopClosing()) {
return;
}
Invoke(
&uv_signal_start, GetRaw(),
[](uv_signal_t* handle, int signum) {

View File

@@ -35,6 +35,9 @@ WriteReq::WriteReq() {
}
void Stream::Shutdown(const std::shared_ptr<ShutdownReq>& req) {
if (IsLoopClosing()) {
return;
}
if (Invoke(&uv_shutdown, req->GetRaw(), GetRawStream(),
[](uv_shutdown_t* req, int status) {
auto& h = *static_cast<ShutdownReq*>(req->data);
@@ -50,6 +53,9 @@ void Stream::Shutdown(const std::shared_ptr<ShutdownReq>& req) {
}
void Stream::Shutdown(std::function<void()> callback) {
if (IsLoopClosing()) {
return;
}
auto req = std::make_shared<ShutdownReq>();
if (callback) {
req->complete.connect(std::move(callback));
@@ -58,6 +64,9 @@ void Stream::Shutdown(std::function<void()> callback) {
}
void Stream::StartRead() {
if (IsLoopClosing()) {
return;
}
Invoke(&uv_read_start, GetRawStream(), &Handle::AllocBuf,
[](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
auto& h = *static_cast<Stream*>(stream->data);
@@ -79,6 +88,9 @@ void Stream::StartRead() {
void Stream::Write(std::span<const Buffer> bufs,
const std::shared_ptr<WriteReq>& req) {
if (IsLoopClosing()) {
return;
}
if (Invoke(&uv_write, req->GetRaw(), GetRawStream(), bufs.data(), bufs.size(),
[](uv_write_t* r, int status) {
auto& h = *static_cast<WriteReq*>(r->data);
@@ -98,6 +110,9 @@ void Stream::Write(std::span<const Buffer> bufs,
}
int Stream::TryWrite(std::span<const Buffer> bufs) {
if (IsLoopClosing()) {
return 0;
}
int val = uv_try_write(GetRawStream(), bufs.data(), bufs.size());
if (val < 0) {
this->ReportError(val);
@@ -107,6 +122,9 @@ int Stream::TryWrite(std::span<const Buffer> bufs) {
}
int Stream::TryWrite2(std::span<const Buffer> bufs, Stream& send) {
if (IsLoopClosing()) {
return 0;
}
int val = uv_try_write2(GetRawStream(), bufs.data(), bufs.size(),
send.GetRawStream());
if (val < 0) {

View File

@@ -11,6 +11,9 @@
namespace wpi::uv {
std::shared_ptr<Tcp> Tcp::Create(Loop& loop, unsigned int flags) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Tcp>(private_init{});
int err = uv_tcp_init_ex(loop.GetRaw(), h->GetRaw(), flags);
if (err < 0) {
@@ -22,7 +25,7 @@ std::shared_ptr<Tcp> Tcp::Create(Loop& loop, unsigned int flags) {
}
void Tcp::Reuse(std::function<void()> callback, unsigned int flags) {
if (IsClosing()) {
if (IsLoopClosing() || IsClosing()) {
return;
}
if (!m_reuseData) {
@@ -103,6 +106,9 @@ sockaddr_storage Tcp::GetPeer() {
void Tcp::Connect(const sockaddr& addr,
const std::shared_ptr<TcpConnectReq>& req) {
if (IsLoopClosing()) {
return;
}
if (Invoke(&uv_tcp_connect, req->GetRaw(), GetRaw(), &addr,
[](uv_connect_t* req, int status) {
auto& h = *static_cast<TcpConnectReq*>(req->data);
@@ -118,6 +124,9 @@ void Tcp::Connect(const sockaddr& addr,
}
void Tcp::Connect(const sockaddr& addr, std::function<void()> callback) {
if (IsLoopClosing()) {
return;
}
auto req = std::make_shared<TcpConnectReq>();
req->connected.connect(std::move(callback));
Connect(addr, req);

View File

@@ -9,6 +9,9 @@
namespace wpi::uv {
std::shared_ptr<Timer> Timer::Create(Loop& loop) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Timer>(private_init{});
int err = uv_timer_init(loop.GetRaw(), h->GetRaw());
if (err < 0) {
@@ -32,6 +35,9 @@ void Timer::SingleShot(Loop& loop, Time timeout, std::function<void()> func) {
}
void Timer::Start(Time timeout, Time repeat) {
if (IsLoopClosing()) {
return;
}
Invoke(
&uv_timer_start, GetRaw(),
[](uv_timer_t* handle) {

View File

@@ -9,6 +9,9 @@
namespace wpi::uv {
std::shared_ptr<Tty> Tty::Create(Loop& loop, uv_file fd, bool readable) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Tty>(private_init{});
int err = uv_tty_init(loop.GetRaw(), h->GetRaw(), fd, readable ? 1 : 0);
if (err < 0) {

View File

@@ -38,6 +38,9 @@ UdpSendReq::UdpSendReq() {
}
std::shared_ptr<Udp> Udp::Create(Loop& loop, unsigned int flags) {
if (loop.IsClosing()) {
return nullptr;
}
auto h = std::make_shared<Udp>(private_init{});
int err = uv_udp_init_ex(loop.GetRaw(), h->GetRaw(), flags);
if (err < 0) {
@@ -135,6 +138,9 @@ void Udp::SetMulticastInterface(std::string_view interfaceAddr) {
void Udp::Send(const sockaddr& addr, std::span<const Buffer> bufs,
const std::shared_ptr<UdpSendReq>& req) {
if (IsLoopClosing()) {
return;
}
if (Invoke(&uv_udp_send, req->GetRaw(), GetRaw(), bufs.data(), bufs.size(),
&addr, [](uv_udp_send_t* r, int status) {
auto& h = *static_cast<UdpSendReq*>(r->data);
@@ -150,12 +156,18 @@ void Udp::Send(const sockaddr& addr, std::span<const Buffer> bufs,
void Udp::Send(const sockaddr& addr, std::span<const Buffer> bufs,
std::function<void(std::span<Buffer>, Error)> callback) {
if (IsLoopClosing()) {
return;
}
Send(addr, bufs,
std::make_shared<CallbackUdpSendReq>(bufs, std::move(callback)));
}
void Udp::Send(std::span<const Buffer> bufs,
const std::shared_ptr<UdpSendReq>& req) {
if (IsLoopClosing()) {
return;
}
if (Invoke(&uv_udp_send, req->GetRaw(), GetRaw(), bufs.data(), bufs.size(),
nullptr, [](uv_udp_send_t* r, int status) {
auto& h = *static_cast<UdpSendReq*>(r->data);
@@ -171,10 +183,16 @@ void Udp::Send(std::span<const Buffer> bufs,
void Udp::Send(std::span<const Buffer> bufs,
std::function<void(std::span<Buffer>, Error)> callback) {
if (IsLoopClosing()) {
return;
}
Send(bufs, std::make_shared<CallbackUdpSendReq>(bufs, std::move(callback)));
}
void Udp::StartRecv() {
if (IsLoopClosing()) {
return;
}
Invoke(&uv_udp_recv_start, GetRaw(), &AllocBuf,
[](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf,
const sockaddr* addr, unsigned flags) {

View File

@@ -13,6 +13,9 @@ WorkReq::WorkReq() {
}
void QueueWork(Loop& loop, const std::shared_ptr<WorkReq>& req) {
if (loop.IsClosing()) {
return;
}
int err = uv_queue_work(
loop.GetRaw(), req->GetRaw(),
[](uv_work_t* req) {
@@ -37,6 +40,9 @@ void QueueWork(Loop& loop, const std::shared_ptr<WorkReq>& req) {
void QueueWork(Loop& loop, std::function<void()> work,
std::function<void()> afterWork) {
if (loop.IsClosing()) {
return;
}
auto req = std::make_shared<WorkReq>();
if (work) {
req->work.connect(std::move(work));