From 49459d3e45eae3e6218205317e9aaa012b96b8bd Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Fri, 3 Feb 2023 22:05:41 -0800 Subject: [PATCH] [ntcore] Change wire timeout to fixed 1 second (#5048) Previously the timeout was 10 times the update rate, so with low update rates it could be as small as 50 ms, causing spurious disconnects when large or many topics were published. --- ntcore/src/main/native/cpp/net/ClientImpl.cpp | 19 ++++++++----------- ntcore/src/main/native/cpp/net/ServerImpl.cpp | 14 ++++---------- .../src/main/native/cpp/net3/ClientImpl3.cpp | 19 ++++++++----------- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.cpp b/ntcore/src/main/native/cpp/net/ClientImpl.cpp index 73f740b47f..1852b6646e 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ClientImpl.cpp @@ -30,9 +30,9 @@ using namespace nt::net; static constexpr uint32_t kMinPeriodMs = 5; -// maximum number of times the wire can be not ready to send another +// maximum amount of time the wire can be not ready to send another // transmission before we close the connection -static constexpr int kWireMaxNotReady = 10; +static constexpr uint32_t kWireMaxNotReadyMs = 1000; namespace { @@ -58,7 +58,7 @@ class CImpl : public ServerMessageHandler { bool SendControl(uint64_t curTimeMs); void SendValues(uint64_t curTimeMs, bool flush); void SendInitialValues(); - bool CheckNetworkReady(); + bool CheckNetworkReady(uint64_t curTimeMs); // ServerMessageHandler interface void ServerAnnounce(std::string_view name, int64_t id, @@ -98,7 +98,6 @@ class CImpl : public ServerMessageHandler { // periodic sweep handling uint32_t m_periodMs{kPingIntervalMs + 10}; uint64_t m_lastSendMs{0}; - int m_notReadyCount{0}; // outgoing queue std::vector m_outgoing; @@ -208,7 +207,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) { // start a timestamp RTT ping if it's time to do one if (curTimeMs >= m_nextPingTimeMs) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return false; } auto now = wpi::Now(); @@ -219,7 +218,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) { } if (!m_outgoing.empty()) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return false; } auto writer = m_wire.SendText(); @@ -258,7 +257,7 @@ void CImpl::SendValues(uint64_t curTimeMs, bool flush) { (flush || curTimeMs >= pub->nextSendMs)) { for (auto&& val : pub->outValues) { if (!checkedNetwork) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return; } checkedNetwork = true; @@ -312,15 +311,13 @@ void CImpl::SendInitialValues() { } } -bool CImpl::CheckNetworkReady() { +bool CImpl::CheckNetworkReady(uint64_t curTimeMs) { if (!m_wire.Ready()) { - ++m_notReadyCount; - if (m_notReadyCount > kWireMaxNotReady) { + if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) { m_wire.Disconnect("transmit stalled"); } return false; } - m_notReadyCount = 0; return true; } diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.cpp b/ntcore/src/main/native/cpp/net/ServerImpl.cpp index e33bfecb36..d9e5365534 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ServerImpl.cpp @@ -48,9 +48,9 @@ using namespace mpack; static constexpr uint32_t kMinPeriodMs = 5; -// maximum number of times the wire can be not ready to send another +// maximum amount of time the wire can be not ready to send another // transmission before we close the connection -static constexpr int kWireMaxNotReady = 10; +static constexpr uint32_t kWireMaxNotReadyMs = 1000; namespace { @@ -214,7 +214,6 @@ class ClientData4 final : public ClientData4Base { private: std::vector m_outgoing; - int m_notReadyCount{0}; bool WriteBinary(int64_t id, int64_t time, const Value& value) { return WireEncodeBinary(SendBinary().Add(), id, time, value); @@ -293,7 +292,6 @@ class ClientData3 final : public ClientData, private net3::MessageHandler3 { std::vector m_outgoing; int64_t m_nextPubUid{1}; - int m_notReadyCount{0}; struct TopicData3 { explicit TopicData3(TopicData* topic) { UpdateFlags(topic); } @@ -941,13 +939,11 @@ void ClientData4::SendOutgoing(uint64_t curTimeMs) { } if (!m_wire.Ready()) { - ++m_notReadyCount; - if (m_notReadyCount > kWireMaxNotReady) { + if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) { m_wire.Disconnect("transmit stalled"); } return; } - m_notReadyCount = 0; for (auto&& msg : m_outgoing) { if (auto m = std::get_if(&msg.contents)) { @@ -1114,13 +1110,11 @@ void ClientData3::SendOutgoing(uint64_t curTimeMs) { } if (!m_wire.Ready()) { - ++m_notReadyCount; - if (m_notReadyCount > kWireMaxNotReady) { + if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) { m_wire.Disconnect("transmit stalled"); } return; } - m_notReadyCount = 0; auto out = m_wire.Send(); for (auto&& msg : m_outgoing) { diff --git a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp index 39981d66a4..305fc9bab7 100644 --- a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp +++ b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp @@ -31,9 +31,9 @@ using namespace nt::net3; static constexpr uint32_t kMinPeriodMs = 5; -// maximum number of times the wire can be not ready to send another +// maximum amount of time the wire can be not ready to send another // transmission before we close the connection -static constexpr int kWireMaxNotReady = 10; +static constexpr uint32_t kWireMaxNotReadyMs = 1000; namespace { @@ -93,7 +93,7 @@ class CImpl : public MessageHandler3 { void HandleLocal(std::span msgs); void SendPeriodic(uint64_t curTimeMs, bool initial, bool flush); void SendValue(Writer& out, Entry* entry, const Value& value); - bool CheckNetworkReady(); + bool CheckNetworkReady(uint64_t curTimeMs); // Outgoing handlers void Publish(NT_Publisher pubHandle, NT_Topic topicHandle, @@ -142,7 +142,6 @@ class CImpl : public MessageHandler3 { uint32_t m_periodMs{kKeepAliveIntervalMs + 10}; uint64_t m_lastSendMs{0}; uint64_t m_nextKeepAliveTimeMs; - int m_notReadyCount{0}; // indexed by publisher index std::vector> m_publishers; @@ -235,7 +234,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) { // send keep-alives if (curTimeMs >= m_nextKeepAliveTimeMs) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return; } DEBUG4("Sending keep alive"); @@ -246,7 +245,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) { // send any stored-up flags updates if (!m_outgoingFlags.empty()) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return; } for (auto&& p : m_outgoingFlags) { @@ -261,7 +260,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) { if (pub && !pub->outValues.empty() && (flush || curTimeMs >= pub->nextSendMs)) { if (!checkedNetwork) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return; } checkedNetwork = true; @@ -302,15 +301,13 @@ void CImpl::SendValue(Writer& out, Entry* entry, const Value& value) { } } -bool CImpl::CheckNetworkReady() { +bool CImpl::CheckNetworkReady(uint64_t curTimeMs) { if (!m_wire.Ready()) { - ++m_notReadyCount; - if (m_notReadyCount > kWireMaxNotReady) { + if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) { m_wire.Disconnect("transmit stalled"); } return false; } - m_notReadyCount = 0; return true; }