mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-20 00:51:42 +00:00
[ntcore] Change wire timeout to fixed 1 second (#5048)
Previously the timeout was 10 times the update rate, so with low update rates it could be as small as 50 ms, causing spurious disconnects when large or many topics were published.
This commit is contained in:
@@ -30,9 +30,9 @@ using namespace nt::net;
|
||||
|
||||
static constexpr uint32_t kMinPeriodMs = 5;
|
||||
|
||||
// maximum number of times the wire can be not ready to send another
|
||||
// maximum amount of time the wire can be not ready to send another
|
||||
// transmission before we close the connection
|
||||
static constexpr int kWireMaxNotReady = 10;
|
||||
static constexpr uint32_t kWireMaxNotReadyMs = 1000;
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -58,7 +58,7 @@ class CImpl : public ServerMessageHandler {
|
||||
bool SendControl(uint64_t curTimeMs);
|
||||
void SendValues(uint64_t curTimeMs, bool flush);
|
||||
void SendInitialValues();
|
||||
bool CheckNetworkReady();
|
||||
bool CheckNetworkReady(uint64_t curTimeMs);
|
||||
|
||||
// ServerMessageHandler interface
|
||||
void ServerAnnounce(std::string_view name, int64_t id,
|
||||
@@ -98,7 +98,6 @@ class CImpl : public ServerMessageHandler {
|
||||
// periodic sweep handling
|
||||
uint32_t m_periodMs{kPingIntervalMs + 10};
|
||||
uint64_t m_lastSendMs{0};
|
||||
int m_notReadyCount{0};
|
||||
|
||||
// outgoing queue
|
||||
std::vector<ClientMessage> m_outgoing;
|
||||
@@ -208,7 +207,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) {
|
||||
|
||||
// start a timestamp RTT ping if it's time to do one
|
||||
if (curTimeMs >= m_nextPingTimeMs) {
|
||||
if (!CheckNetworkReady()) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return false;
|
||||
}
|
||||
auto now = wpi::Now();
|
||||
@@ -219,7 +218,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) {
|
||||
}
|
||||
|
||||
if (!m_outgoing.empty()) {
|
||||
if (!CheckNetworkReady()) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return false;
|
||||
}
|
||||
auto writer = m_wire.SendText();
|
||||
@@ -258,7 +257,7 @@ void CImpl::SendValues(uint64_t curTimeMs, bool flush) {
|
||||
(flush || curTimeMs >= pub->nextSendMs)) {
|
||||
for (auto&& val : pub->outValues) {
|
||||
if (!checkedNetwork) {
|
||||
if (!CheckNetworkReady()) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return;
|
||||
}
|
||||
checkedNetwork = true;
|
||||
@@ -312,15 +311,13 @@ void CImpl::SendInitialValues() {
|
||||
}
|
||||
}
|
||||
|
||||
bool CImpl::CheckNetworkReady() {
|
||||
bool CImpl::CheckNetworkReady(uint64_t curTimeMs) {
|
||||
if (!m_wire.Ready()) {
|
||||
++m_notReadyCount;
|
||||
if (m_notReadyCount > kWireMaxNotReady) {
|
||||
if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) {
|
||||
m_wire.Disconnect("transmit stalled");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
m_notReadyCount = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -48,9 +48,9 @@ using namespace mpack;
|
||||
|
||||
static constexpr uint32_t kMinPeriodMs = 5;
|
||||
|
||||
// maximum number of times the wire can be not ready to send another
|
||||
// maximum amount of time the wire can be not ready to send another
|
||||
// transmission before we close the connection
|
||||
static constexpr int kWireMaxNotReady = 10;
|
||||
static constexpr uint32_t kWireMaxNotReadyMs = 1000;
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -214,7 +214,6 @@ class ClientData4 final : public ClientData4Base {
|
||||
|
||||
private:
|
||||
std::vector<ServerMessage> m_outgoing;
|
||||
int m_notReadyCount{0};
|
||||
|
||||
bool WriteBinary(int64_t id, int64_t time, const Value& value) {
|
||||
return WireEncodeBinary(SendBinary().Add(), id, time, value);
|
||||
@@ -293,7 +292,6 @@ class ClientData3 final : public ClientData, private net3::MessageHandler3 {
|
||||
|
||||
std::vector<net3::Message3> m_outgoing;
|
||||
int64_t m_nextPubUid{1};
|
||||
int m_notReadyCount{0};
|
||||
|
||||
struct TopicData3 {
|
||||
explicit TopicData3(TopicData* topic) { UpdateFlags(topic); }
|
||||
@@ -941,13 +939,11 @@ void ClientData4::SendOutgoing(uint64_t curTimeMs) {
|
||||
}
|
||||
|
||||
if (!m_wire.Ready()) {
|
||||
++m_notReadyCount;
|
||||
if (m_notReadyCount > kWireMaxNotReady) {
|
||||
if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) {
|
||||
m_wire.Disconnect("transmit stalled");
|
||||
}
|
||||
return;
|
||||
}
|
||||
m_notReadyCount = 0;
|
||||
|
||||
for (auto&& msg : m_outgoing) {
|
||||
if (auto m = std::get_if<ServerValueMsg>(&msg.contents)) {
|
||||
@@ -1114,13 +1110,11 @@ void ClientData3::SendOutgoing(uint64_t curTimeMs) {
|
||||
}
|
||||
|
||||
if (!m_wire.Ready()) {
|
||||
++m_notReadyCount;
|
||||
if (m_notReadyCount > kWireMaxNotReady) {
|
||||
if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) {
|
||||
m_wire.Disconnect("transmit stalled");
|
||||
}
|
||||
return;
|
||||
}
|
||||
m_notReadyCount = 0;
|
||||
|
||||
auto out = m_wire.Send();
|
||||
for (auto&& msg : m_outgoing) {
|
||||
|
||||
@@ -31,9 +31,9 @@ using namespace nt::net3;
|
||||
|
||||
static constexpr uint32_t kMinPeriodMs = 5;
|
||||
|
||||
// maximum number of times the wire can be not ready to send another
|
||||
// maximum amount of time the wire can be not ready to send another
|
||||
// transmission before we close the connection
|
||||
static constexpr int kWireMaxNotReady = 10;
|
||||
static constexpr uint32_t kWireMaxNotReadyMs = 1000;
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -93,7 +93,7 @@ class CImpl : public MessageHandler3 {
|
||||
void HandleLocal(std::span<const net::ClientMessage> msgs);
|
||||
void SendPeriodic(uint64_t curTimeMs, bool initial, bool flush);
|
||||
void SendValue(Writer& out, Entry* entry, const Value& value);
|
||||
bool CheckNetworkReady();
|
||||
bool CheckNetworkReady(uint64_t curTimeMs);
|
||||
|
||||
// Outgoing handlers
|
||||
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
|
||||
@@ -142,7 +142,6 @@ class CImpl : public MessageHandler3 {
|
||||
uint32_t m_periodMs{kKeepAliveIntervalMs + 10};
|
||||
uint64_t m_lastSendMs{0};
|
||||
uint64_t m_nextKeepAliveTimeMs;
|
||||
int m_notReadyCount{0};
|
||||
|
||||
// indexed by publisher index
|
||||
std::vector<std::unique_ptr<PublisherData>> m_publishers;
|
||||
@@ -235,7 +234,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) {
|
||||
|
||||
// send keep-alives
|
||||
if (curTimeMs >= m_nextKeepAliveTimeMs) {
|
||||
if (!CheckNetworkReady()) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return;
|
||||
}
|
||||
DEBUG4("Sending keep alive");
|
||||
@@ -246,7 +245,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) {
|
||||
|
||||
// send any stored-up flags updates
|
||||
if (!m_outgoingFlags.empty()) {
|
||||
if (!CheckNetworkReady()) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return;
|
||||
}
|
||||
for (auto&& p : m_outgoingFlags) {
|
||||
@@ -261,7 +260,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) {
|
||||
if (pub && !pub->outValues.empty() &&
|
||||
(flush || curTimeMs >= pub->nextSendMs)) {
|
||||
if (!checkedNetwork) {
|
||||
if (!CheckNetworkReady()) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return;
|
||||
}
|
||||
checkedNetwork = true;
|
||||
@@ -302,15 +301,13 @@ void CImpl::SendValue(Writer& out, Entry* entry, const Value& value) {
|
||||
}
|
||||
}
|
||||
|
||||
bool CImpl::CheckNetworkReady() {
|
||||
bool CImpl::CheckNetworkReady(uint64_t curTimeMs) {
|
||||
if (!m_wire.Ready()) {
|
||||
++m_notReadyCount;
|
||||
if (m_notReadyCount > kWireMaxNotReady) {
|
||||
if (m_lastSendMs != 0 && curTimeMs > (m_lastSendMs + kWireMaxNotReadyMs)) {
|
||||
m_wire.Disconnect("transmit stalled");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
m_notReadyCount = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user