diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.cpp b/ntcore/src/main/native/cpp/net/ClientImpl.cpp index 73f740b47f..1852b6646e 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ClientImpl.cpp @@ -30,9 +30,9 @@ using namespace nt::net; static constexpr uint32_t kMinPeriodMs = 5; -// maximum number of times the wire can be not ready to send another +// maximum amount of time the wire can be not ready to send another // transmission before we close the connection -static constexpr int kWireMaxNotReady = 10; +static constexpr uint32_t kWireMaxNotReadyMs = 1000; namespace { @@ -58,7 +58,7 @@ class CImpl : public ServerMessageHandler { bool SendControl(uint64_t curTimeMs); void SendValues(uint64_t curTimeMs, bool flush); void SendInitialValues(); - bool CheckNetworkReady(); + bool CheckNetworkReady(uint64_t curTimeMs); // ServerMessageHandler interface void ServerAnnounce(std::string_view name, int64_t id, @@ -98,7 +98,6 @@ class CImpl : public ServerMessageHandler { // periodic sweep handling uint32_t m_periodMs{kPingIntervalMs + 10}; uint64_t m_lastSendMs{0}; - int m_notReadyCount{0}; // outgoing queue std::vector m_outgoing; @@ -208,7 +207,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) { // start a timestamp RTT ping if it's time to do one if (curTimeMs >= m_nextPingTimeMs) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return false; } auto now = wpi::Now(); @@ -219,7 +218,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) { } if (!m_outgoing.empty()) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return false; } auto writer = m_wire.SendText(); @@ -258,7 +257,7 @@ void CImpl::SendValues(uint64_t curTimeMs, bool flush) { (flush || curTimeMs >= pub->nextSendMs)) { for (auto&& val : pub->outValues) { if (!checkedNetwork) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return; } checkedNetwork = true; @@ -312,15 +311,13 @@ void CImpl::SendInitialValues() { } } -bool CImpl::CheckNetworkReady() { +bool CImpl::CheckNetworkReady(uint64_t curTimeMs) { if (!m_wire.Ready()) { - ++m_notReadyCount; - if (m_notReadyCount > kWireMaxNotReady) { + if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) { m_wire.Disconnect("transmit stalled"); } return false; } - m_notReadyCount = 0; return true; } diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.cpp b/ntcore/src/main/native/cpp/net/ServerImpl.cpp index e33bfecb36..d9e5365534 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ServerImpl.cpp @@ -48,9 +48,9 @@ using namespace mpack; static constexpr uint32_t kMinPeriodMs = 5; -// maximum number of times the wire can be not ready to send another +// maximum amount of time the wire can be not ready to send another // transmission before we close the connection -static constexpr int kWireMaxNotReady = 10; +static constexpr uint32_t kWireMaxNotReadyMs = 1000; namespace { @@ -214,7 +214,6 @@ class ClientData4 final : public ClientData4Base { private: std::vector m_outgoing; - int m_notReadyCount{0}; bool WriteBinary(int64_t id, int64_t time, const Value& value) { return WireEncodeBinary(SendBinary().Add(), id, time, value); @@ -293,7 +292,6 @@ class ClientData3 final : public ClientData, private net3::MessageHandler3 { std::vector m_outgoing; int64_t m_nextPubUid{1}; - int m_notReadyCount{0}; struct TopicData3 { explicit TopicData3(TopicData* topic) { UpdateFlags(topic); } @@ -941,13 +939,11 @@ void ClientData4::SendOutgoing(uint64_t curTimeMs) { } if (!m_wire.Ready()) { - ++m_notReadyCount; - if (m_notReadyCount > kWireMaxNotReady) { + if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) { m_wire.Disconnect("transmit stalled"); } return; } - m_notReadyCount = 0; for (auto&& msg : m_outgoing) { if (auto m = std::get_if(&msg.contents)) { @@ -1114,13 +1110,11 @@ void ClientData3::SendOutgoing(uint64_t curTimeMs) { } if (!m_wire.Ready()) { - ++m_notReadyCount; - if (m_notReadyCount > kWireMaxNotReady) { + if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) { m_wire.Disconnect("transmit stalled"); } return; } - m_notReadyCount = 0; auto out = m_wire.Send(); for (auto&& msg : m_outgoing) { diff --git a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp index 39981d66a4..305fc9bab7 100644 --- a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp +++ b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp @@ -31,9 +31,9 @@ using namespace nt::net3; static constexpr uint32_t kMinPeriodMs = 5; -// maximum number of times the wire can be not ready to send another +// maximum amount of time the wire can be not ready to send another // transmission before we close the connection -static constexpr int kWireMaxNotReady = 10; +static constexpr uint32_t kWireMaxNotReadyMs = 1000; namespace { @@ -93,7 +93,7 @@ class CImpl : public MessageHandler3 { void HandleLocal(std::span msgs); void SendPeriodic(uint64_t curTimeMs, bool initial, bool flush); void SendValue(Writer& out, Entry* entry, const Value& value); - bool CheckNetworkReady(); + bool CheckNetworkReady(uint64_t curTimeMs); // Outgoing handlers void Publish(NT_Publisher pubHandle, NT_Topic topicHandle, @@ -142,7 +142,6 @@ class CImpl : public MessageHandler3 { uint32_t m_periodMs{kKeepAliveIntervalMs + 10}; uint64_t m_lastSendMs{0}; uint64_t m_nextKeepAliveTimeMs; - int m_notReadyCount{0}; // indexed by publisher index std::vector> m_publishers; @@ -235,7 +234,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) { // send keep-alives if (curTimeMs >= m_nextKeepAliveTimeMs) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return; } DEBUG4("Sending keep alive"); @@ -246,7 +245,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) { // send any stored-up flags updates if (!m_outgoingFlags.empty()) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return; } for (auto&& p : m_outgoingFlags) { @@ -261,7 +260,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) { if (pub && !pub->outValues.empty() && (flush || curTimeMs >= pub->nextSendMs)) { if (!checkedNetwork) { - if (!CheckNetworkReady()) { + if (!CheckNetworkReady(curTimeMs)) { return; } checkedNetwork = true; @@ -302,15 +301,13 @@ void CImpl::SendValue(Writer& out, Entry* entry, const Value& value) { } } -bool CImpl::CheckNetworkReady() { +bool CImpl::CheckNetworkReady(uint64_t curTimeMs) { if (!m_wire.Ready()) { - ++m_notReadyCount; - if (m_notReadyCount > kWireMaxNotReady) { + if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) { m_wire.Disconnect("transmit stalled"); } return false; } - m_notReadyCount = 0; return true; }