From 77b2f9802ea9ce19815e658682920153d21d7182 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sat, 7 Feb 2026 00:35:02 -0500 Subject: [PATCH] [ntcore] Fix local client behavior (#8588) Fixes local clients receiving inconsistent data updates. Data was only flushed to local clients on incoming websocket data (or when explicitly flushed), so local clients on a quiet server would stop receiving updates. Revert some changes from #7997. Instead of relying on periodic sends, always immediately send local data updates to the wire for lower latency. This means that on local clients, the periodic update rate and sendAll settings will be ignored, as all data updates will be sent immediately. --- ntcore/src/main/native/cpp/NetworkServer.cpp | 4 ---- .../src/main/native/cpp/net/NetworkOutgoingQueue.hpp | 10 ++++++++-- ntcore/src/main/native/cpp/server/ServerImpl.cpp | 8 -------- ntcore/src/main/native/cpp/server/ServerImpl.hpp | 1 - 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/ntcore/src/main/native/cpp/NetworkServer.cpp b/ntcore/src/main/native/cpp/NetworkServer.cpp index 4880473cac..c01968e2ff 100644 --- a/ntcore/src/main/native/cpp/NetworkServer.cpp +++ b/ntcore/src/main/native/cpp/NetworkServer.cpp @@ -59,7 +59,6 @@ class NetworkServer::ServerConnection { void SetupOutgoingTimer(); void UpdateOutgoingTimer(uint32_t repeatMs); void ConnectionClosed(); - uv::Loop& GetLoopRef() const { return m_outgoingTimer->GetLoopRef(); } NetworkServer& m_server; ConnectionInfo m_info; @@ -239,13 +238,11 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() { if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) { m_server.m_idle->Start(); } - m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count()); }); m_websocket->binary.connect([this](std::span data, bool) { if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) { m_server.m_idle->Start(); } - m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count()); }); SetupOutgoingTimer(); @@ -425,7 +422,6 @@ void NetworkServer::Init() { DEBUG4("Stopping idle processing"); m_idle->Stop(); // go back to sleep } - m_serverImpl.SendAllLocalOutgoing(m_loop.Now().count()); }); } diff --git a/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp b/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp index f7438f07f2..e5579ccf7f 100644 --- a/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp +++ b/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp @@ -58,7 +58,7 @@ concept NetworkMessage = std::same_as || std::same_as; -enum class ValueSendMode { kDisabled = 0, kAll, kNormal }; +enum class ValueSendMode { kDisabled = 0, kAll, kNormal, kImm }; template class NetworkOutgoingQueue { @@ -115,6 +115,9 @@ class NetworkOutgoingQueue { } void SendValue(int id, const Value& value, ValueSendMode mode) { + if (m_local) { + mode = ValueSendMode::kImm; // always send local immediately + } // backpressure by stopping sending all if the buffer is too full if (mode == ValueSendMode::kAll && m_totalSize >= kOutgoingLimit) { mode = ValueSendMode::kNormal; @@ -122,6 +125,9 @@ class NetworkOutgoingQueue { switch (mode) { case ValueSendMode::kDisabled: // do nothing break; + case ValueSendMode::kImm: // send immediately + m_wire.SendBinary([&](auto& os) { EncodeValue(os, id, value); }); + break; case ValueSendMode::kAll: { // append to outgoing auto& info = m_idMap[id]; auto& queue = m_queues[info.queueIndex]; @@ -161,7 +167,7 @@ class NetworkOutgoingQueue { return; // nothing to do } - // rate limit frequency of transmissions + // rate limit frequency of transmissions for remote connections if (!m_local && curTimeMs < (m_lastSendMs + kMinPeriodMs)) { return; } diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.cpp b/ntcore/src/main/native/cpp/server/ServerImpl.cpp index f94f2a9189..4ce79752be 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/server/ServerImpl.cpp @@ -152,14 +152,6 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) { } } -void ServerImpl::SendAllLocalOutgoing(uint64_t curTimeMs) { - for (auto&& client : m_clients) { - if (client && client->IsLocal()) { - client->SendOutgoing(curTimeMs, true); - } - } -} - void ServerImpl::SetLocal(net::ServerMessageHandler* local, net::ClientMessageQueue* queue) { DEBUG4("SetLocal()"); diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.hpp b/ntcore/src/main/native/cpp/server/ServerImpl.hpp index c6f4493569..bce1c191fd 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.hpp +++ b/ntcore/src/main/native/cpp/server/ServerImpl.hpp @@ -43,7 +43,6 @@ class ServerImpl final { void SendAllOutgoing(uint64_t curTimeMs, bool flush); void SendOutgoing(int clientId, uint64_t curTimeMs); - void SendAllLocalOutgoing(uint64_t curTimeMs); void SetLocal(net::ServerMessageHandler* local, net::ClientMessageQueue* queue);