[ntcore] Fix local client behavior (#8588)

Fixes local clients receiving inconsistent data updates. Data was only
flushed to local clients on incoming websocket data (or when explicitly
flushed), so local clients on a quiet server would stop receiving
updates.

Revert some changes from #7997.
Instead of relying on periodic sends, always immediately send local data
updates to the wire for lower latency. This means that on local clients,
the periodic update rate and sendAll settings will be ignored, as all
data updates will be sent immediately.
This commit is contained in:
Ryan Blue
2026-02-07 00:35:02 -05:00
committed by GitHub
parent a2a40b56d1
commit 77b2f9802e
4 changed files with 8 additions and 15 deletions

View File

@@ -59,7 +59,6 @@ class NetworkServer::ServerConnection {
void SetupOutgoingTimer();
void UpdateOutgoingTimer(uint32_t repeatMs);
void ConnectionClosed();
uv::Loop& GetLoopRef() const { return m_outgoingTimer->GetLoopRef(); }
NetworkServer& m_server;
ConnectionInfo m_info;
@@ -239,13 +238,11 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) {
m_server.m_idle->Start();
}
m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count());
});
m_websocket->binary.connect([this](std::span<const uint8_t> data, bool) {
if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) {
m_server.m_idle->Start();
}
m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count());
});
SetupOutgoingTimer();
@@ -425,7 +422,6 @@ void NetworkServer::Init() {
DEBUG4("Stopping idle processing");
m_idle->Stop(); // go back to sleep
}
m_serverImpl.SendAllLocalOutgoing(m_loop.Now().count());
});
}

View File

@@ -58,7 +58,7 @@ concept NetworkMessage =
std::same_as<typename MessageType::ValueMsg, ServerValueMsg> ||
std::same_as<typename MessageType::ValueMsg, ClientValueMsg>;
enum class ValueSendMode { kDisabled = 0, kAll, kNormal };
enum class ValueSendMode { kDisabled = 0, kAll, kNormal, kImm };
template <NetworkMessage MessageType>
class NetworkOutgoingQueue {
@@ -115,6 +115,9 @@ class NetworkOutgoingQueue {
}
void SendValue(int id, const Value& value, ValueSendMode mode) {
if (m_local) {
mode = ValueSendMode::kImm; // always send local immediately
}
// backpressure by stopping sending all if the buffer is too full
if (mode == ValueSendMode::kAll && m_totalSize >= kOutgoingLimit) {
mode = ValueSendMode::kNormal;
@@ -122,6 +125,9 @@ class NetworkOutgoingQueue {
switch (mode) {
case ValueSendMode::kDisabled: // do nothing
break;
case ValueSendMode::kImm: // send immediately
m_wire.SendBinary([&](auto& os) { EncodeValue(os, id, value); });
break;
case ValueSendMode::kAll: { // append to outgoing
auto& info = m_idMap[id];
auto& queue = m_queues[info.queueIndex];
@@ -161,7 +167,7 @@ class NetworkOutgoingQueue {
return; // nothing to do
}
// rate limit frequency of transmissions
// rate limit frequency of transmissions for remote connections
if (!m_local && curTimeMs < (m_lastSendMs + kMinPeriodMs)) {
return;
}

View File

@@ -152,14 +152,6 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) {
}
}
void ServerImpl::SendAllLocalOutgoing(uint64_t curTimeMs) {
for (auto&& client : m_clients) {
if (client && client->IsLocal()) {
client->SendOutgoing(curTimeMs, true);
}
}
}
void ServerImpl::SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue) {
DEBUG4("SetLocal()");

View File

@@ -43,7 +43,6 @@ class ServerImpl final {
void SendAllOutgoing(uint64_t curTimeMs, bool flush);
void SendOutgoing(int clientId, uint64_t curTimeMs);
void SendAllLocalOutgoing(uint64_t curTimeMs);
void SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue);