diff --git a/ntcore/src/main/native/cpp/NetworkClient.cpp b/ntcore/src/main/native/cpp/NetworkClient.cpp index a95066db48..6b74a8bd4b 100644 --- a/ntcore/src/main/native/cpp/NetworkClient.cpp +++ b/ntcore/src/main/native/cpp/NetworkClient.cpp @@ -155,7 +155,9 @@ void NCImpl::SetServers( [this, servers = std::move(serversCopy)](uv::Loop&) mutable { m_servers = std::move(servers); if (m_dsClientServer.first.empty()) { - m_parallelConnect->SetServers(m_servers); + if (m_parallelConnect) { + m_parallelConnect->SetServers(m_servers); + } } }); } @@ -167,14 +169,20 @@ void NCImpl::StartDSClient(unsigned int port) { } m_dsClientServer.second = port == 0 ? NT_DEFAULT_PORT4 : port; m_dsClient = wpi::DsClient::Create(m_loop, m_logger); - m_dsClient->setIp.connect([this](std::string_view ip) { - m_dsClientServer.first = ip; - m_parallelConnect->SetServers({{m_dsClientServer}}); - }); - m_dsClient->clearIp.connect([this] { - m_dsClientServer.first.clear(); - m_parallelConnect->SetServers(m_servers); - }); + if (m_dsClient) { + m_dsClient->setIp.connect([this](std::string_view ip) { + m_dsClientServer.first = ip; + if (m_parallelConnect) { + m_parallelConnect->SetServers({{m_dsClientServer}}); + } + }); + m_dsClient->clearIp.connect([this] { + m_dsClientServer.first.clear(); + if (m_parallelConnect) { + m_parallelConnect->SetServers(m_servers); + } + }); + } }); } @@ -191,15 +199,20 @@ void NCImpl::Disconnect(std::string_view reason) { if (m_readLocalTimer) { m_readLocalTimer->Stop(); } - m_sendValuesTimer->Stop(); + if (m_sendValuesTimer) { + m_sendValuesTimer->Stop(); + } m_localStorage.ClearNetwork(); m_localQueue.ClearQueue(); m_connList.RemoveConnection(m_connHandle); m_connHandle = 0; // start trying to connect again - uv::Timer::SingleShot(m_loop, kReconnectRate, - [this] { m_parallelConnect->Disconnected(); }); + uv::Timer::SingleShot(m_loop, kReconnectRate, [this] { + if (m_parallelConnect) { + m_parallelConnect->Disconnected(); + } + }); } NCImpl3::NCImpl3(int inst, std::string_view id, @@ -212,25 +225,31 @@ NCImpl3::NCImpl3(int inst, std::string_view id, [this](uv::Tcp& tcp) { TcpConnected(tcp); }); m_sendValuesTimer = uv::Timer::Create(loop); - m_sendValuesTimer->timeout.connect([this] { - if (m_clientImpl) { - HandleLocal(); - m_clientImpl->SendPeriodic(m_loop.Now().count(), false); - } - }); + if (m_sendValuesTimer) { + m_sendValuesTimer->timeout.connect([this] { + if (m_clientImpl) { + HandleLocal(); + m_clientImpl->SendPeriodic(m_loop.Now().count(), false); + } + }); + } // set up flush async m_flush = uv::Async<>::Create(m_loop); - m_flush->wakeup.connect([this] { - if (m_clientImpl) { - HandleLocal(); - m_clientImpl->SendPeriodic(m_loop.Now().count(), true); - } - }); + if (m_flush) { + m_flush->wakeup.connect([this] { + if (m_clientImpl) { + HandleLocal(); + m_clientImpl->SendPeriodic(m_loop.Now().count(), true); + } + }); + } m_flushAtomic = m_flush.get(); m_flushLocal = uv::Async<>::Create(m_loop); - m_flushLocal->wakeup.connect([this] { HandleLocal(); }); + if (m_flushLocal) { + m_flushLocal->wakeup.connect([this] { HandleLocal(); }); + } m_flushLocalAtomic = m_flushLocal.get(); }); } @@ -261,8 +280,10 @@ void NCImpl3::TcpConnected(uv::Tcp& tcp) { auto clientImpl = std::make_shared( m_loop.Now().count(), m_inst, *wire, m_logger, [this](uint32_t repeatMs) { DEBUG4("Setting periodic timer to {}", repeatMs); - m_sendValuesTimer->Start(uv::Timer::Time{repeatMs}, - uv::Timer::Time{repeatMs}); + if (m_sendValuesTimer) { + m_sendValuesTimer->Start(uv::Timer::Time{repeatMs}, + uv::Timer::Time{repeatMs}); + } }); clientImpl->Start( m_id, [this, wire, @@ -276,7 +297,9 @@ void NCImpl3::TcpConnected(uv::Tcp& tcp) { return; } - m_parallelConnect->Succeeded(tcp); + if (m_parallelConnect) { + m_parallelConnect->Succeeded(tcp); + } m_wire = std::move(wire); m_clientImpl = std::move(clientImpl); @@ -343,34 +366,42 @@ NCImpl4::NCImpl4( [this](uv::Tcp& tcp) { TcpConnected(tcp); }); m_readLocalTimer = uv::Timer::Create(loop); - m_readLocalTimer->timeout.connect([this] { - if (m_clientImpl) { - HandleLocal(); - m_clientImpl->SendControl(m_loop.Now().count()); - } - }); - m_readLocalTimer->Start(uv::Timer::Time{100}, uv::Timer::Time{100}); + if (m_readLocalTimer) { + m_readLocalTimer->timeout.connect([this] { + if (m_clientImpl) { + HandleLocal(); + m_clientImpl->SendControl(m_loop.Now().count()); + } + }); + m_readLocalTimer->Start(uv::Timer::Time{100}, uv::Timer::Time{100}); + } m_sendValuesTimer = uv::Timer::Create(loop); - m_sendValuesTimer->timeout.connect([this] { - if (m_clientImpl) { - HandleLocal(); - m_clientImpl->SendValues(m_loop.Now().count(), false); - } - }); + if (m_sendValuesTimer) { + m_sendValuesTimer->timeout.connect([this] { + if (m_clientImpl) { + HandleLocal(); + m_clientImpl->SendValues(m_loop.Now().count(), false); + } + }); + } // set up flush async m_flush = uv::Async<>::Create(m_loop); - m_flush->wakeup.connect([this] { - if (m_clientImpl) { - HandleLocal(); - m_clientImpl->SendValues(m_loop.Now().count(), true); - } - }); + if (m_flush) { + m_flush->wakeup.connect([this] { + if (m_clientImpl) { + HandleLocal(); + m_clientImpl->SendValues(m_loop.Now().count(), true); + } + }); + } m_flushAtomic = m_flush.get(); m_flushLocal = uv::Async<>::Create(m_loop); - m_flushLocal->wakeup.connect([this] { HandleLocal(); }); + if (m_flushLocal) { + m_flushLocal->wakeup.connect([this] { HandleLocal(); }); + } m_flushLocalAtomic = m_flushLocal.get(); }); } @@ -418,7 +449,9 @@ void NCImpl4::TcpConnected(uv::Tcp& tcp) { } void NCImpl4::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) { - m_parallelConnect->Succeeded(tcp); + if (m_parallelConnect) { + m_parallelConnect->Succeeded(tcp); + } ConnectionInfo connInfo; uv::AddrToName(tcp.GetPeer(), &connInfo.remote_ip, &connInfo.remote_port); @@ -432,8 +465,10 @@ void NCImpl4::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) { m_loop.Now().count(), m_inst, *m_wire, m_logger, m_timeSyncUpdated, [this](uint32_t repeatMs) { DEBUG4("Setting periodic timer to {}", repeatMs); - m_sendValuesTimer->Start(uv::Timer::Time{repeatMs}, - uv::Timer::Time{repeatMs}); + if (m_sendValuesTimer) { + m_sendValuesTimer->Start(uv::Timer::Time{repeatMs}, + uv::Timer::Time{repeatMs}); + } }); m_clientImpl->SetLocal(&m_localStorage); m_localStorage.StartNetwork(&m_localQueue); diff --git a/ntcore/src/main/native/cpp/NetworkServer.cpp b/ntcore/src/main/native/cpp/NetworkServer.cpp index c60796a7cc..484bf9f76e 100644 --- a/ntcore/src/main/native/cpp/NetworkServer.cpp +++ b/ntcore/src/main/native/cpp/NetworkServer.cpp @@ -420,36 +420,46 @@ void NSImpl::Init() { // set up timers m_readLocalTimer = uv::Timer::Create(m_loop); - m_readLocalTimer->timeout.connect([this] { - HandleLocal(); - m_serverImpl.SendControl(m_loop.Now().count()); - }); - m_readLocalTimer->Start(uv::Timer::Time{100}, uv::Timer::Time{100}); + if (m_readLocalTimer) { + m_readLocalTimer->timeout.connect([this] { + HandleLocal(); + m_serverImpl.SendControl(m_loop.Now().count()); + }); + m_readLocalTimer->Start(uv::Timer::Time{100}, uv::Timer::Time{100}); + } m_savePersistentTimer = uv::Timer::Create(m_loop); - m_savePersistentTimer->timeout.connect([this] { - if (m_serverImpl.PersistentChanged()) { - uv::QueueWork( - m_loop, - [this, fn = m_persistentFilename, - data = m_serverImpl.DumpPersistent()] { SavePersistent(fn, data); }, - nullptr); - } - }); - m_savePersistentTimer->Start(uv::Timer::Time{1000}, uv::Timer::Time{1000}); + if (m_savePersistentTimer) { + m_savePersistentTimer->timeout.connect([this] { + if (m_serverImpl.PersistentChanged()) { + uv::QueueWork( + m_loop, + [this, fn = m_persistentFilename, + data = m_serverImpl.DumpPersistent()] { + SavePersistent(fn, data); + }, + nullptr); + } + }); + m_savePersistentTimer->Start(uv::Timer::Time{1000}, uv::Timer::Time{1000}); + } // set up flush async m_flush = uv::Async<>::Create(m_loop); - m_flush->wakeup.connect([this] { - HandleLocal(); - for (auto&& conn : m_connections) { - m_serverImpl.SendValues(conn.conn->GetClientId(), m_loop.Now().count()); - } - }); + if (m_flush) { + m_flush->wakeup.connect([this] { + HandleLocal(); + for (auto&& conn : m_connections) { + m_serverImpl.SendValues(conn.conn->GetClientId(), m_loop.Now().count()); + } + }); + } m_flushAtomic = m_flush.get(); m_flushLocal = uv::Async<>::Create(m_loop); - m_flushLocal->wakeup.connect([this] { HandleLocal(); }); + if (m_flushLocal) { + m_flushLocal->wakeup.connect([this] { HandleLocal(); }); + } m_flushLocalAtomic = m_flushLocal.get(); INFO("Listening on NT3 port {}, NT4 port {}", m_port3, m_port4); diff --git a/wpinet/src/main/native/cpp/DsClient.cpp b/wpinet/src/main/native/cpp/DsClient.cpp index 86f8f000c7..97509cb323 100644 --- a/wpinet/src/main/native/cpp/DsClient.cpp +++ b/wpinet/src/main/native/cpp/DsClient.cpp @@ -21,6 +21,9 @@ DsClient::DsClient(wpi::uv::Loop& loop, wpi::Logger& logger, : m_logger{logger}, m_tcp{uv::Tcp::Create(loop)}, m_timer{uv::Timer::Create(loop)} { + if (!m_tcp || !m_timer) { + return; + } m_tcp->end.connect([this] { WPI_DEBUG4(m_logger, "DS connection closed"); clearIp(); diff --git a/wpinet/src/main/native/cpp/EventLoopRunner.cpp b/wpinet/src/main/native/cpp/EventLoopRunner.cpp index 7c7e79c116..6c143ac674 100644 --- a/wpinet/src/main/native/cpp/EventLoopRunner.cpp +++ b/wpinet/src/main/native/cpp/EventLoopRunner.cpp @@ -59,6 +59,7 @@ void EventLoopRunner::Stop() { h.SetLoopClosing(true); h.Close(); }); + loop.SetClosing(); }); m_owner.Join(); } diff --git a/wpinet/src/main/native/cpp/ParallelTcpConnector.cpp b/wpinet/src/main/native/cpp/ParallelTcpConnector.cpp index 317f0a2b11..a16a4e9915 100644 --- a/wpinet/src/main/native/cpp/ParallelTcpConnector.cpp +++ b/wpinet/src/main/native/cpp/ParallelTcpConnector.cpp @@ -24,6 +24,9 @@ ParallelTcpConnector::ParallelTcpConnector( m_reconnectRate{reconnectRate}, m_connected{std::move(connected)}, m_reconnectTimer{uv::Timer::Create(loop)} { + if (!m_reconnectTimer) { + return; + } m_reconnectTimer->timeout.connect([this] { if (!IsConnected()) { WPI_DEBUG1(m_logger, "timed out, reconnecting"); @@ -86,6 +89,9 @@ void ParallelTcpConnector::Connect() { // kick off parallel connection attempts for (auto ai = &addrinfo; ai; ai = ai->ai_next) { auto tcp = uv::Tcp::Create(m_loop); + if (!tcp) { + continue; + } m_attempts.emplace_back(tcp); auto connreq = std::make_shared(); diff --git a/wpinet/src/main/native/cpp/PortForwarder.cpp b/wpinet/src/main/native/cpp/PortForwarder.cpp index 257b6209b6..67cd806e0a 100644 --- a/wpinet/src/main/native/cpp/PortForwarder.cpp +++ b/wpinet/src/main/native/cpp/PortForwarder.cpp @@ -49,6 +49,9 @@ void PortForwarder::Add(unsigned int port, std::string_view remoteHost, unsigned int remotePort) { m_impl->runner.ExecSync([&](uv::Loop& loop) { auto server = uv::Tcp::Create(loop); + if (!server) { + return; + } // bind to local port server->Bind("", port); @@ -71,6 +74,10 @@ void PortForwarder::Add(unsigned int port, std::string_view remoteHost, client->SetData(connected); auto remote = uv::Tcp::Create(loop); + if (!remote) { + client->Close(); + return; + } remote->error.connect( [remotePtr = remote.get(), clientWeak = std::weak_ptr(client)](uv::Error err) { diff --git a/wpinet/src/main/native/cpp/WebSocket.cpp b/wpinet/src/main/native/cpp/WebSocket.cpp index 576870aadc..dc2cd29837 100644 --- a/wpinet/src/main/native/cpp/WebSocket.cpp +++ b/wpinet/src/main/native/cpp/WebSocket.cpp @@ -261,11 +261,12 @@ void WebSocket::StartClient(std::string_view uri, std::string_view host, // Start handshake timer if a timeout was specified if (options.handshakeTimeout != (uv::Timer::Time::max)()) { - auto timer = uv::Timer::Create(m_stream.GetLoopRef()); - timer->timeout.connect( - [this]() { Terminate(1006, "connection timed out"); }); - timer->Start(options.handshakeTimeout); - m_clientHandshake->timer = timer; + if (auto timer = uv::Timer::Create(m_stream.GetLoopRef())) { + timer->timeout.connect( + [this]() { Terminate(1006, "connection timed out"); }); + timer->Start(options.handshakeTimeout); + m_clientHandshake->timer = timer; + } } } diff --git a/wpinet/src/main/native/cpp/uv/Async.cpp b/wpinet/src/main/native/cpp/uv/Async.cpp index f84bb9b4c7..58ef5f392b 100644 --- a/wpinet/src/main/native/cpp/uv/Async.cpp +++ b/wpinet/src/main/native/cpp/uv/Async.cpp @@ -17,6 +17,9 @@ Async<>::~Async() noexcept { } std::shared_ptr> Async<>::Create(const std::shared_ptr& loop) { + if (loop->IsClosing()) { + return nullptr; + } auto h = std::make_shared(loop, private_init{}); int err = uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) { Async& h = *static_cast(handle->data); diff --git a/wpinet/src/main/native/cpp/uv/Check.cpp b/wpinet/src/main/native/cpp/uv/Check.cpp index 13c2229a5f..75ff47c7a4 100644 --- a/wpinet/src/main/native/cpp/uv/Check.cpp +++ b/wpinet/src/main/native/cpp/uv/Check.cpp @@ -9,6 +9,9 @@ namespace wpi::uv { std::shared_ptr Check::Create(Loop& loop) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_check_init(loop.GetRaw(), h->GetRaw()); if (err < 0) { @@ -20,6 +23,9 @@ std::shared_ptr Check::Create(Loop& loop) { } void Check::Start() { + if (IsLoopClosing()) { + return; + } Invoke(&uv_check_start, GetRaw(), [](uv_check_t* handle) { Check& h = *static_cast(handle->data); h.check(); diff --git a/wpinet/src/main/native/cpp/uv/FsEvent.cpp b/wpinet/src/main/native/cpp/uv/FsEvent.cpp index 044390edcc..d77bf37c4b 100644 --- a/wpinet/src/main/native/cpp/uv/FsEvent.cpp +++ b/wpinet/src/main/native/cpp/uv/FsEvent.cpp @@ -13,6 +13,9 @@ namespace wpi::uv { std::shared_ptr FsEvent::Create(Loop& loop) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_fs_event_init(loop.GetRaw(), h->GetRaw()); if (err < 0) { diff --git a/wpinet/src/main/native/cpp/uv/GetAddrInfo.cpp b/wpinet/src/main/native/cpp/uv/GetAddrInfo.cpp index 14721f2052..c3ec00010e 100644 --- a/wpinet/src/main/native/cpp/uv/GetAddrInfo.cpp +++ b/wpinet/src/main/native/cpp/uv/GetAddrInfo.cpp @@ -18,6 +18,9 @@ GetAddrInfoReq::GetAddrInfoReq() { void GetAddrInfo(Loop& loop, const std::shared_ptr& req, std::string_view node, std::string_view service, const addrinfo* hints) { + if (loop.IsClosing()) { + return; + } SmallString<128> nodeStr{node}; SmallString<128> serviceStr{service}; int err = uv_getaddrinfo( diff --git a/wpinet/src/main/native/cpp/uv/GetNameInfo.cpp b/wpinet/src/main/native/cpp/uv/GetNameInfo.cpp index a6ad36d7ab..9720cc3be7 100644 --- a/wpinet/src/main/native/cpp/uv/GetNameInfo.cpp +++ b/wpinet/src/main/native/cpp/uv/GetNameInfo.cpp @@ -15,6 +15,9 @@ GetNameInfoReq::GetNameInfoReq() { void GetNameInfo(Loop& loop, const std::shared_ptr& req, const sockaddr& addr, int flags) { + if (loop.IsClosing()) { + return; + } int err = uv_getnameinfo( loop.GetRaw(), req->GetRaw(), [](uv_getnameinfo_t* req, int status, const char* hostname, diff --git a/wpinet/src/main/native/cpp/uv/Idle.cpp b/wpinet/src/main/native/cpp/uv/Idle.cpp index 452bc7ea34..7b94b3f4a8 100644 --- a/wpinet/src/main/native/cpp/uv/Idle.cpp +++ b/wpinet/src/main/native/cpp/uv/Idle.cpp @@ -9,6 +9,9 @@ namespace wpi::uv { std::shared_ptr Idle::Create(Loop& loop) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_idle_init(loop.GetRaw(), h->GetRaw()); if (err < 0) { @@ -20,6 +23,9 @@ std::shared_ptr Idle::Create(Loop& loop) { } void Idle::Start() { + if (IsLoopClosing()) { + return; + } Invoke(&uv_idle_start, GetRaw(), [](uv_idle_t* handle) { Idle& h = *static_cast(handle->data); h.idle(); diff --git a/wpinet/src/main/native/cpp/uv/NetworkStream.cpp b/wpinet/src/main/native/cpp/uv/NetworkStream.cpp index 3538596b5d..12750b2b76 100644 --- a/wpinet/src/main/native/cpp/uv/NetworkStream.cpp +++ b/wpinet/src/main/native/cpp/uv/NetworkStream.cpp @@ -11,6 +11,9 @@ ConnectReq::ConnectReq() { } void NetworkStream::Listen(int backlog) { + if (IsLoopClosing()) { + return; + } Invoke(&uv_listen, GetRawStream(), backlog, [](uv_stream_t* handle, int status) { auto& h = *static_cast(handle->data); diff --git a/wpinet/src/main/native/cpp/uv/Pipe.cpp b/wpinet/src/main/native/cpp/uv/Pipe.cpp index 9548874bdf..799360409e 100644 --- a/wpinet/src/main/native/cpp/uv/Pipe.cpp +++ b/wpinet/src/main/native/cpp/uv/Pipe.cpp @@ -11,6 +11,9 @@ namespace wpi::uv { std::shared_ptr Pipe::Create(Loop& loop, bool ipc) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_pipe_init(loop.GetRaw(), h->GetRaw(), ipc ? 1 : 0); if (err < 0) { @@ -22,7 +25,7 @@ std::shared_ptr Pipe::Create(Loop& loop, bool ipc) { } void Pipe::Reuse(std::function callback, bool ipc) { - if (IsClosing()) { + if (IsLoopClosing() || IsClosing()) { return; } if (!m_reuseData) { @@ -69,6 +72,9 @@ void Pipe::Bind(std::string_view name) { void Pipe::Connect(std::string_view name, const std::shared_ptr& req) { + if (IsLoopClosing()) { + return; + } SmallString<128> nameBuf{name}; uv_pipe_connect(req->GetRaw(), GetRaw(), nameBuf.c_str(), [](uv_connect_t* req, int status) { diff --git a/wpinet/src/main/native/cpp/uv/Poll.cpp b/wpinet/src/main/native/cpp/uv/Poll.cpp index 3713453189..7d35615118 100644 --- a/wpinet/src/main/native/cpp/uv/Poll.cpp +++ b/wpinet/src/main/native/cpp/uv/Poll.cpp @@ -9,6 +9,9 @@ namespace wpi::uv { std::shared_ptr Poll::Create(Loop& loop, int fd) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_poll_init(loop.GetRaw(), h->GetRaw(), fd); if (err < 0) { @@ -20,6 +23,9 @@ std::shared_ptr Poll::Create(Loop& loop, int fd) { } std::shared_ptr Poll::CreateSocket(Loop& loop, uv_os_sock_t sock) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_poll_init_socket(loop.GetRaw(), h->GetRaw(), sock); if (err < 0) { @@ -31,7 +37,7 @@ std::shared_ptr Poll::CreateSocket(Loop& loop, uv_os_sock_t sock) { } void Poll::Reuse(int fd, std::function callback) { - if (IsClosing()) { + if (IsLoopClosing() || IsClosing()) { return; } if (!m_reuseData) { @@ -56,7 +62,7 @@ void Poll::Reuse(int fd, std::function callback) { } void Poll::ReuseSocket(uv_os_sock_t sock, std::function callback) { - if (IsClosing()) { + if (IsLoopClosing() || IsClosing()) { return; } if (!m_reuseData) { @@ -81,6 +87,9 @@ void Poll::ReuseSocket(uv_os_sock_t sock, std::function callback) { } void Poll::Start(int events) { + if (IsLoopClosing()) { + return; + } Invoke(&uv_poll_start, GetRaw(), events, [](uv_poll_t* handle, int status, int events) { Poll& h = *static_cast(handle->data); diff --git a/wpinet/src/main/native/cpp/uv/Prepare.cpp b/wpinet/src/main/native/cpp/uv/Prepare.cpp index e4ca160859..aa1a89d6e1 100644 --- a/wpinet/src/main/native/cpp/uv/Prepare.cpp +++ b/wpinet/src/main/native/cpp/uv/Prepare.cpp @@ -9,6 +9,9 @@ namespace wpi::uv { std::shared_ptr Prepare::Create(Loop& loop) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_prepare_init(loop.GetRaw(), h->GetRaw()); if (err < 0) { @@ -20,6 +23,9 @@ std::shared_ptr Prepare::Create(Loop& loop) { } void Prepare::Start() { + if (IsLoopClosing()) { + return; + } Invoke(&uv_prepare_start, GetRaw(), [](uv_prepare_t* handle) { Prepare& h = *static_cast(handle->data); h.prepare(); diff --git a/wpinet/src/main/native/cpp/uv/Process.cpp b/wpinet/src/main/native/cpp/uv/Process.cpp index 3c10db6032..c872ff9b85 100644 --- a/wpinet/src/main/native/cpp/uv/Process.cpp +++ b/wpinet/src/main/native/cpp/uv/Process.cpp @@ -13,6 +13,10 @@ namespace wpi::uv { std::shared_ptr Process::SpawnArray(Loop& loop, std::string_view file, std::span options) { + if (loop.IsClosing()) { + return nullptr; + } + // convert Option array to libuv structure uv_process_options_t coptions; diff --git a/wpinet/src/main/native/cpp/uv/Signal.cpp b/wpinet/src/main/native/cpp/uv/Signal.cpp index 10dd7b4d72..8f998e2b48 100644 --- a/wpinet/src/main/native/cpp/uv/Signal.cpp +++ b/wpinet/src/main/native/cpp/uv/Signal.cpp @@ -9,6 +9,9 @@ namespace wpi::uv { std::shared_ptr Signal::Create(Loop& loop) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_signal_init(loop.GetRaw(), h->GetRaw()); if (err < 0) { @@ -20,6 +23,9 @@ std::shared_ptr Signal::Create(Loop& loop) { } void Signal::Start(int signum) { + if (IsLoopClosing()) { + return; + } Invoke( &uv_signal_start, GetRaw(), [](uv_signal_t* handle, int signum) { diff --git a/wpinet/src/main/native/cpp/uv/Stream.cpp b/wpinet/src/main/native/cpp/uv/Stream.cpp index e7f6031a3d..2891aaeeb8 100644 --- a/wpinet/src/main/native/cpp/uv/Stream.cpp +++ b/wpinet/src/main/native/cpp/uv/Stream.cpp @@ -35,6 +35,9 @@ WriteReq::WriteReq() { } void Stream::Shutdown(const std::shared_ptr& req) { + if (IsLoopClosing()) { + return; + } if (Invoke(&uv_shutdown, req->GetRaw(), GetRawStream(), [](uv_shutdown_t* req, int status) { auto& h = *static_cast(req->data); @@ -50,6 +53,9 @@ void Stream::Shutdown(const std::shared_ptr& req) { } void Stream::Shutdown(std::function callback) { + if (IsLoopClosing()) { + return; + } auto req = std::make_shared(); if (callback) { req->complete.connect(std::move(callback)); @@ -58,6 +64,9 @@ void Stream::Shutdown(std::function callback) { } void Stream::StartRead() { + if (IsLoopClosing()) { + return; + } Invoke(&uv_read_start, GetRawStream(), &Handle::AllocBuf, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { auto& h = *static_cast(stream->data); @@ -79,6 +88,9 @@ void Stream::StartRead() { void Stream::Write(std::span bufs, const std::shared_ptr& req) { + if (IsLoopClosing()) { + return; + } if (Invoke(&uv_write, req->GetRaw(), GetRawStream(), bufs.data(), bufs.size(), [](uv_write_t* r, int status) { auto& h = *static_cast(r->data); @@ -98,6 +110,9 @@ void Stream::Write(std::span bufs, } int Stream::TryWrite(std::span bufs) { + if (IsLoopClosing()) { + return 0; + } int val = uv_try_write(GetRawStream(), bufs.data(), bufs.size()); if (val < 0) { this->ReportError(val); @@ -107,6 +122,9 @@ int Stream::TryWrite(std::span bufs) { } int Stream::TryWrite2(std::span bufs, Stream& send) { + if (IsLoopClosing()) { + return 0; + } int val = uv_try_write2(GetRawStream(), bufs.data(), bufs.size(), send.GetRawStream()); if (val < 0) { diff --git a/wpinet/src/main/native/cpp/uv/Tcp.cpp b/wpinet/src/main/native/cpp/uv/Tcp.cpp index ae01683467..b163a0efba 100644 --- a/wpinet/src/main/native/cpp/uv/Tcp.cpp +++ b/wpinet/src/main/native/cpp/uv/Tcp.cpp @@ -11,6 +11,9 @@ namespace wpi::uv { std::shared_ptr Tcp::Create(Loop& loop, unsigned int flags) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_tcp_init_ex(loop.GetRaw(), h->GetRaw(), flags); if (err < 0) { @@ -22,7 +25,7 @@ std::shared_ptr Tcp::Create(Loop& loop, unsigned int flags) { } void Tcp::Reuse(std::function callback, unsigned int flags) { - if (IsClosing()) { + if (IsLoopClosing() || IsClosing()) { return; } if (!m_reuseData) { @@ -103,6 +106,9 @@ sockaddr_storage Tcp::GetPeer() { void Tcp::Connect(const sockaddr& addr, const std::shared_ptr& req) { + if (IsLoopClosing()) { + return; + } if (Invoke(&uv_tcp_connect, req->GetRaw(), GetRaw(), &addr, [](uv_connect_t* req, int status) { auto& h = *static_cast(req->data); @@ -118,6 +124,9 @@ void Tcp::Connect(const sockaddr& addr, } void Tcp::Connect(const sockaddr& addr, std::function callback) { + if (IsLoopClosing()) { + return; + } auto req = std::make_shared(); req->connected.connect(std::move(callback)); Connect(addr, req); diff --git a/wpinet/src/main/native/cpp/uv/Timer.cpp b/wpinet/src/main/native/cpp/uv/Timer.cpp index 9d5217367b..e9b33fc7c7 100644 --- a/wpinet/src/main/native/cpp/uv/Timer.cpp +++ b/wpinet/src/main/native/cpp/uv/Timer.cpp @@ -9,6 +9,9 @@ namespace wpi::uv { std::shared_ptr Timer::Create(Loop& loop) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_timer_init(loop.GetRaw(), h->GetRaw()); if (err < 0) { @@ -32,6 +35,9 @@ void Timer::SingleShot(Loop& loop, Time timeout, std::function func) { } void Timer::Start(Time timeout, Time repeat) { + if (IsLoopClosing()) { + return; + } Invoke( &uv_timer_start, GetRaw(), [](uv_timer_t* handle) { diff --git a/wpinet/src/main/native/cpp/uv/Tty.cpp b/wpinet/src/main/native/cpp/uv/Tty.cpp index 6043a93ca4..5e5756cacc 100644 --- a/wpinet/src/main/native/cpp/uv/Tty.cpp +++ b/wpinet/src/main/native/cpp/uv/Tty.cpp @@ -9,6 +9,9 @@ namespace wpi::uv { std::shared_ptr Tty::Create(Loop& loop, uv_file fd, bool readable) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_tty_init(loop.GetRaw(), h->GetRaw(), fd, readable ? 1 : 0); if (err < 0) { diff --git a/wpinet/src/main/native/cpp/uv/Udp.cpp b/wpinet/src/main/native/cpp/uv/Udp.cpp index 689d5a72f5..1922c57596 100644 --- a/wpinet/src/main/native/cpp/uv/Udp.cpp +++ b/wpinet/src/main/native/cpp/uv/Udp.cpp @@ -38,6 +38,9 @@ UdpSendReq::UdpSendReq() { } std::shared_ptr Udp::Create(Loop& loop, unsigned int flags) { + if (loop.IsClosing()) { + return nullptr; + } auto h = std::make_shared(private_init{}); int err = uv_udp_init_ex(loop.GetRaw(), h->GetRaw(), flags); if (err < 0) { @@ -135,6 +138,9 @@ void Udp::SetMulticastInterface(std::string_view interfaceAddr) { void Udp::Send(const sockaddr& addr, std::span bufs, const std::shared_ptr& req) { + if (IsLoopClosing()) { + return; + } if (Invoke(&uv_udp_send, req->GetRaw(), GetRaw(), bufs.data(), bufs.size(), &addr, [](uv_udp_send_t* r, int status) { auto& h = *static_cast(r->data); @@ -150,12 +156,18 @@ void Udp::Send(const sockaddr& addr, std::span bufs, void Udp::Send(const sockaddr& addr, std::span bufs, std::function, Error)> callback) { + if (IsLoopClosing()) { + return; + } Send(addr, bufs, std::make_shared(bufs, std::move(callback))); } void Udp::Send(std::span bufs, const std::shared_ptr& req) { + if (IsLoopClosing()) { + return; + } if (Invoke(&uv_udp_send, req->GetRaw(), GetRaw(), bufs.data(), bufs.size(), nullptr, [](uv_udp_send_t* r, int status) { auto& h = *static_cast(r->data); @@ -171,10 +183,16 @@ void Udp::Send(std::span bufs, void Udp::Send(std::span bufs, std::function, Error)> callback) { + if (IsLoopClosing()) { + return; + } Send(bufs, std::make_shared(bufs, std::move(callback))); } void Udp::StartRecv() { + if (IsLoopClosing()) { + return; + } Invoke(&uv_udp_recv_start, GetRaw(), &AllocBuf, [](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const sockaddr* addr, unsigned flags) { diff --git a/wpinet/src/main/native/cpp/uv/Work.cpp b/wpinet/src/main/native/cpp/uv/Work.cpp index 818a93b3db..d94619fd21 100644 --- a/wpinet/src/main/native/cpp/uv/Work.cpp +++ b/wpinet/src/main/native/cpp/uv/Work.cpp @@ -13,6 +13,9 @@ WorkReq::WorkReq() { } void QueueWork(Loop& loop, const std::shared_ptr& req) { + if (loop.IsClosing()) { + return; + } int err = uv_queue_work( loop.GetRaw(), req->GetRaw(), [](uv_work_t* req) { @@ -37,6 +40,9 @@ void QueueWork(Loop& loop, const std::shared_ptr& req) { void QueueWork(Loop& loop, std::function work, std::function afterWork) { + if (loop.IsClosing()) { + return; + } auto req = std::make_shared(); if (work) { req->work.connect(std::move(work)); diff --git a/wpinet/src/main/native/include/wpinet/ParallelTcpConnector.h b/wpinet/src/main/native/include/wpinet/ParallelTcpConnector.h index e7bc953d62..0811350d0e 100644 --- a/wpinet/src/main/native/include/wpinet/ParallelTcpConnector.h +++ b/wpinet/src/main/native/include/wpinet/ParallelTcpConnector.h @@ -59,6 +59,9 @@ class ParallelTcpConnector static std::shared_ptr Create( wpi::uv::Loop& loop, wpi::uv::Timer::Time reconnectRate, wpi::Logger& logger, std::function connected) { + if (loop.IsClosing()) { + return nullptr; + } return std::make_shared( loop, reconnectRate, logger, std::move(connected), private_init{}); } diff --git a/wpinet/src/main/native/include/wpinet/uv/Async.h b/wpinet/src/main/native/include/wpinet/uv/Async.h index eb3a0055e0..2eb13d7308 100644 --- a/wpinet/src/main/native/include/wpinet/uv/Async.h +++ b/wpinet/src/main/native/include/wpinet/uv/Async.h @@ -62,6 +62,9 @@ class Async final : public HandleImpl, uv_async_t> { * @param loop Loop object where this handle runs. */ static std::shared_ptr Create(const std::shared_ptr& loop) { + if (loop->IsClosing()) { + return nullptr; + } auto h = std::make_shared(loop, private_init{}); int err = uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) { @@ -89,6 +92,9 @@ class Async final : public HandleImpl, uv_async_t> { template void Send(U&&... u) { auto loop = m_loop.lock(); + if (loop->IsClosing()) { + return; + } if (loop && loop->GetThreadId() == std::this_thread::get_id()) { // called from within the loop, just call the function directly wakeup(std::forward(u)...); @@ -161,6 +167,9 @@ class Async<> final : public HandleImpl, uv_async_t> { */ void Send() { if (auto loop = m_loop.lock()) { + if (loop->IsClosing()) { + return; + } if (loop->GetThreadId() == std::this_thread::get_id()) { // called from within the loop, just call the function directly wakeup(); diff --git a/wpinet/src/main/native/include/wpinet/uv/AsyncFunction.h b/wpinet/src/main/native/include/wpinet/uv/AsyncFunction.h index 82a5913035..915daa2070 100644 --- a/wpinet/src/main/native/include/wpinet/uv/AsyncFunction.h +++ b/wpinet/src/main/native/include/wpinet/uv/AsyncFunction.h @@ -74,6 +74,9 @@ class AsyncFunction final static std::shared_ptr Create( const std::shared_ptr& loop, std::function, T...)> func = nullptr) { + if (loop->IsClosing()) { + return nullptr; + } auto h = std::make_shared(loop, std::move(func), private_init{}); int err = @@ -123,6 +126,13 @@ class AsyncFunction final uint64_t req = m_promises.CreateRequest(); auto loop = m_loop.lock(); + if (loop->IsClosing()) { + if constexpr (std::is_same_v) { + return m_promises.MakeReadyFuture(); + } else { + return m_promises.MakeReadyFuture({}); + } + } if (loop && loop->GetThreadId() == std::this_thread::get_id()) { // called from within the loop, just call the function directly wakeup(m_promises.CreatePromise(req), std::forward(u)...); diff --git a/wpinet/src/main/native/include/wpinet/uv/Loop.h b/wpinet/src/main/native/include/wpinet/uv/Loop.h index 129faf5316..0897c87f2b 100644 --- a/wpinet/src/main/native/include/wpinet/uv/Loop.h +++ b/wpinet/src/main/native/include/wpinet/uv/Loop.h @@ -70,6 +70,20 @@ class Loop final : public std::enable_shared_from_this { */ static std::shared_ptr GetDefault(); + /** + * Set the loop closing flag. + * + * This will prevent new handles from being created on the loop. + */ + void SetClosing() { m_closing = true; } + + /** + * Return the loop closed flag. + * + * @return True if SetClosed() has been called. + */ + bool IsClosing() const { return m_closing; } + /** * Release all internal loop resources. * @@ -247,6 +261,7 @@ class Loop final : public std::enable_shared_from_this { uv_loop_t* m_loop; uv_loop_t m_loopStruct; std::atomic m_tid; + bool m_closing = false; }; } // namespace wpi::uv