diff --git a/ntcore/src/main/native/cpp/NetworkServer.cpp b/ntcore/src/main/native/cpp/NetworkServer.cpp index 4880473cac..c01968e2ff 100644 --- a/ntcore/src/main/native/cpp/NetworkServer.cpp +++ b/ntcore/src/main/native/cpp/NetworkServer.cpp @@ -59,7 +59,6 @@ class NetworkServer::ServerConnection { void SetupOutgoingTimer(); void UpdateOutgoingTimer(uint32_t repeatMs); void ConnectionClosed(); - uv::Loop& GetLoopRef() const { return m_outgoingTimer->GetLoopRef(); } NetworkServer& m_server; ConnectionInfo m_info; @@ -239,13 +238,11 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() { if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) { m_server.m_idle->Start(); } - m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count()); }); m_websocket->binary.connect([this](std::span data, bool) { if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) { m_server.m_idle->Start(); } - m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count()); }); SetupOutgoingTimer(); @@ -425,7 +422,6 @@ void NetworkServer::Init() { DEBUG4("Stopping idle processing"); m_idle->Stop(); // go back to sleep } - m_serverImpl.SendAllLocalOutgoing(m_loop.Now().count()); }); } diff --git a/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp b/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp index f7438f07f2..e5579ccf7f 100644 --- a/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp +++ b/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp @@ -58,7 +58,7 @@ concept NetworkMessage = std::same_as || std::same_as; -enum class ValueSendMode { kDisabled = 0, kAll, kNormal }; +enum class ValueSendMode { kDisabled = 0, kAll, kNormal, kImm }; template class NetworkOutgoingQueue { @@ -115,6 +115,9 @@ class NetworkOutgoingQueue { } void SendValue(int id, const Value& value, ValueSendMode mode) { + if (m_local) { + mode = ValueSendMode::kImm; // always send local immediately + } // backpressure by stopping sending all if the buffer is too full if (mode == ValueSendMode::kAll && m_totalSize >= kOutgoingLimit) { mode = ValueSendMode::kNormal; @@ -122,6 +125,9 @@ class NetworkOutgoingQueue { switch (mode) { case ValueSendMode::kDisabled: // do nothing break; + case ValueSendMode::kImm: // send immediately + m_wire.SendBinary([&](auto& os) { EncodeValue(os, id, value); }); + break; case ValueSendMode::kAll: { // append to outgoing auto& info = m_idMap[id]; auto& queue = m_queues[info.queueIndex]; @@ -161,7 +167,7 @@ class NetworkOutgoingQueue { return; // nothing to do } - // rate limit frequency of transmissions + // rate limit frequency of transmissions for remote connections if (!m_local && curTimeMs < (m_lastSendMs + kMinPeriodMs)) { return; } diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.cpp b/ntcore/src/main/native/cpp/server/ServerImpl.cpp index f94f2a9189..4ce79752be 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/server/ServerImpl.cpp @@ -152,14 +152,6 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) { } } -void ServerImpl::SendAllLocalOutgoing(uint64_t curTimeMs) { - for (auto&& client : m_clients) { - if (client && client->IsLocal()) { - client->SendOutgoing(curTimeMs, true); - } - } -} - void ServerImpl::SetLocal(net::ServerMessageHandler* local, net::ClientMessageQueue* queue) { DEBUG4("SetLocal()"); diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.hpp b/ntcore/src/main/native/cpp/server/ServerImpl.hpp index c6f4493569..bce1c191fd 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.hpp +++ b/ntcore/src/main/native/cpp/server/ServerImpl.hpp @@ -43,7 +43,6 @@ class ServerImpl final { void SendAllOutgoing(uint64_t curTimeMs, bool flush); void SendOutgoing(int clientId, uint64_t curTimeMs); - void SendAllLocalOutgoing(uint64_t curTimeMs); void SetLocal(net::ServerMessageHandler* local, net::ClientMessageQueue* queue);