From f99692f287f50d46cf10e91c31a5ca8334b60357 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Mon, 2 Jun 2025 16:43:18 -0700 Subject: [PATCH] [ntcore] Minimize latency on localhost connections (#7997) --- ntcore/src/dev/native/cpp/main.cpp | 73 +++++++++++++++++++ ntcore/src/main/native/cpp/NetworkClient.cpp | 4 +- ntcore/src/main/native/cpp/NetworkServer.cpp | 8 +- ntcore/src/main/native/cpp/net/ClientImpl.cpp | 4 +- ntcore/src/main/native/cpp/net/ClientImpl.h | 2 +- .../native/cpp/net/NetworkOutgoingQueue.h | 10 +-- .../src/main/native/cpp/server/ServerClient.h | 1 + .../src/main/native/cpp/server/ServerImpl.cpp | 8 ++ .../src/main/native/cpp/server/ServerImpl.h | 1 + 9 files changed, 97 insertions(+), 14 deletions(-) diff --git a/ntcore/src/dev/native/cpp/main.cpp b/ntcore/src/dev/native/cpp/main.cpp index 5b4a1b9e8a..3d24fdf2de 100644 --- a/ntcore/src/dev/native/cpp/main.cpp +++ b/ntcore/src/dev/native/cpp/main.cpp @@ -30,6 +30,7 @@ void bench(); void bench2(); void stress(); void stress2(); +void latency(); int main(int argc, char* argv[]) { if (argc == 2 && std::string_view{argv[1]} == "bench") { @@ -48,6 +49,10 @@ int main(int argc, char* argv[]) { stress2(); return EXIT_SUCCESS; } + if (argc == 2 && std::string_view{argv[1]} == "latency") { + latency(); + return EXIT_SUCCESS; + } auto myValue = nt::GetEntry(nt::GetDefaultInstance(), "MyValue"); @@ -325,3 +330,71 @@ void stress2() { std::this_thread::sleep_for(10s); fmt::print("isDone: {}", isDone.load()); } + +void latency() { + // set up instances + auto client1 = nt::CreateInstance(); + auto client2 = nt::CreateInstance(); + auto server = nt::CreateInstance(); + + // connect client and server + nt::StartServer(server, "latency.json", "127.0.0.1", 10000); + nt::StartClient(client1, "client1"); + nt::SetServer(client1, "127.0.0.1", 10000); + nt::StartClient(client2, "client2"); + nt::SetServer(client2, "127.0.0.1", 10000); + + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + + // create publishers and subscribers + auto pub = + nt::Publish(nt::GetTopic(client1, "highrate"), NT_DOUBLE, "double"); + nt::SubscribeMultiple(server, {{std::string_view{}}}); + auto sub = + nt::Subscribe(nt::GetTopic(server, "highrate"), NT_DOUBLE, "double"); + auto sub2 = + nt::Subscribe(nt::GetTopic(client2, "highrate"), NT_DOUBLE, "double"); + + std::this_thread::sleep_for(1s); + + nt::SetDouble(pub, 0); +#if 0 + // warm up + for (int i = 1; i <= 10000; ++i) { + nt::SetDouble(pub, i * 0.01); + if (i % 2000 == 0) { + std::this_thread::sleep_for(0.02s); + } + } +#endif + + std::vector times; + times.reserve(1001); + + // benchmark client to server + for (int i = 1; i <= 1000; ++i) { + int64_t sendTime = nt::Now(); + nt::SetDouble(pub, i, sendTime); + nt::Flush(client1); + while (nt::GetDouble(sub, 0) != i) { + wpi::WaitForObject(sub); + } + times.emplace_back(nt::Now() - sendTime); + } + PrintTimes(times); + + // benchmark client to client + times.resize(0); + for (int i = 2001; i <= 3000; ++i) { + int64_t sendTime = nt::Now(); + nt::SetDouble(pub, i, sendTime); + nt::Flush(client1); + while (nt::GetDouble(sub2, 0) != i) { + wpi::WaitForObject(sub2); + } + times.emplace_back(nt::Now() - sendTime); + } + + PrintTimes(times); +} diff --git a/ntcore/src/main/native/cpp/NetworkClient.cpp b/ntcore/src/main/native/cpp/NetworkClient.cpp index e01b53a9fc..4603df6416 100644 --- a/ntcore/src/main/native/cpp/NetworkClient.cpp +++ b/ntcore/src/main/native/cpp/NetworkClient.cpp @@ -258,10 +258,12 @@ void NetworkClient::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp, INFO("CONNECTED NT4 to {} port {}", connInfo.remote_ip, connInfo.remote_port); m_connHandle = m_connList.AddConnection(connInfo); + bool local = wpi::starts_with(connInfo.remote_ip, "127."); + m_wire = std::make_shared( ws, connInfo.protocol_version, m_logger); m_clientImpl = std::make_unique( - m_loop.Now().count(), *m_wire, m_logger, m_timeSyncUpdated, + m_loop.Now().count(), *m_wire, local, m_logger, m_timeSyncUpdated, [this](uint32_t repeatMs) { DEBUG4("Setting periodic timer to {}", repeatMs); if (m_sendOutgoingTimer && diff --git a/ntcore/src/main/native/cpp/NetworkServer.cpp b/ntcore/src/main/native/cpp/NetworkServer.cpp index f3d0ef4681..e51fc9e2ce 100644 --- a/ntcore/src/main/native/cpp/NetworkServer.cpp +++ b/ntcore/src/main/native/cpp/NetworkServer.cpp @@ -60,6 +60,7 @@ class NetworkServer::ServerConnection { void SetupOutgoingTimer(); void UpdateOutgoingTimer(uint32_t repeatMs); void ConnectionClosed(); + uv::Loop& GetLoopRef() const { return m_outgoingTimer->GetLoopRef(); } NetworkServer& m_server; ConnectionInfo m_info; @@ -221,10 +222,10 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() { return; } - // TODO: set local flag appropriately + bool local = wpi::starts_with(m_info.remote_ip, "127."); std::string dedupName; std::tie(dedupName, m_clientId) = m_server.m_serverImpl.AddClient( - name, m_connInfo, false, *m_wire, + name, m_connInfo, local, *m_wire, [this](uint32_t repeatMs) { UpdateOutgoingTimer(repeatMs); }); INFO("CONNECTED NT4 client '{}' (from {})", dedupName, m_connInfo); m_info.remote_id = dedupName; @@ -239,11 +240,13 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() { if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) { m_server.m_idle->Start(); } + m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count()); }); m_websocket->binary.connect([this](std::span data, bool) { if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) { m_server.m_idle->Start(); } + m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count()); }); SetupOutgoingTimer(); @@ -420,6 +423,7 @@ void NetworkServer::Init() { DEBUG4("Stopping idle processing"); m_idle->Stop(); // go back to sleep } + m_serverImpl.SendAllLocalOutgoing(m_loop.Now().count()); }); } diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.cpp b/ntcore/src/main/native/cpp/net/ClientImpl.cpp index 13b01156cc..c449bb9933 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ClientImpl.cpp @@ -28,7 +28,7 @@ using namespace nt; using namespace nt::net; ClientImpl::ClientImpl( - uint64_t curTimeMs, WireConnection& wire, wpi::Logger& logger, + uint64_t curTimeMs, WireConnection& wire, bool local, wpi::Logger& logger, std::function timeSyncUpdated, std::function setPeriodic) @@ -40,7 +40,7 @@ ClientImpl::ClientImpl( m_nextPingTimeMs{curTimeMs + (wire.GetVersion() >= 0x0401 ? NetworkPing::kPingIntervalMs : kRttIntervalMs)}, - m_outgoing{wire, false} { + m_outgoing{wire, local} { // immediately send RTT ping auto now = wpi::Now(); DEBUG4("Sending initial RTT ping {}", now); diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.h b/ntcore/src/main/native/cpp/net/ClientImpl.h index 9e3a189d1a..91cb47fa2d 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.h +++ b/ntcore/src/main/native/cpp/net/ClientImpl.h @@ -38,7 +38,7 @@ class WireConnection; class ClientImpl final : private ServerMessageHandler { public: ClientImpl( - uint64_t curTimeMs, WireConnection& wire, wpi::Logger& logger, + uint64_t curTimeMs, WireConnection& wire, bool local, wpi::Logger& logger, std::function timeSyncUpdated, std::function setPeriodic); diff --git a/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.h b/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.h index 5f4330216a..fed4acb408 100644 --- a/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.h +++ b/ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.h @@ -59,7 +59,7 @@ concept NetworkMessage = std::same_as || std::same_as; -enum class ValueSendMode { kDisabled = 0, kAll, kNormal, kImm }; +enum class ValueSendMode { kDisabled = 0, kAll, kNormal }; template class NetworkOutgoingQueue { @@ -116,9 +116,6 @@ class NetworkOutgoingQueue { } void SendValue(int id, const Value& value, ValueSendMode mode) { - if (m_local) { - mode = ValueSendMode::kImm; // always send local immediately - } // backpressure by stopping sending all if the buffer is too full if (mode == ValueSendMode::kAll && m_totalSize >= kOutgoingLimit) { mode = ValueSendMode::kNormal; @@ -126,9 +123,6 @@ class NetworkOutgoingQueue { switch (mode) { case ValueSendMode::kDisabled: // do nothing break; - case ValueSendMode::kImm: // send immediately - m_wire.SendBinary([&](auto& os) { EncodeValue(os, id, value); }); - break; case ValueSendMode::kAll: { // append to outgoing auto& info = m_idMap[id]; auto& queue = m_queues[info.queueIndex]; @@ -169,7 +163,7 @@ class NetworkOutgoingQueue { } // rate limit frequency of transmissions - if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) { + if (!m_local && curTimeMs < (m_lastSendMs + kMinPeriodMs)) { return; } diff --git a/ntcore/src/main/native/cpp/server/ServerClient.h b/ntcore/src/main/native/cpp/server/ServerClient.h index d56753b75b..e01d3d538a 100644 --- a/ntcore/src/main/native/cpp/server/ServerClient.h +++ b/ntcore/src/main/native/cpp/server/ServerClient.h @@ -73,6 +73,7 @@ class ServerClient { std::string_view GetName() const { return m_name; } int GetId() const { return m_id; } + bool IsLocal() const { return m_local; } virtual void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) {} diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.cpp b/ntcore/src/main/native/cpp/server/ServerImpl.cpp index 1856407dcd..1973080a24 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/server/ServerImpl.cpp @@ -153,6 +153,14 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) { } } +void ServerImpl::SendAllLocalOutgoing(uint64_t curTimeMs) { + for (auto&& client : m_clients) { + if (client && client->IsLocal()) { + client->SendOutgoing(curTimeMs, true); + } + } +} + void ServerImpl::SetLocal(net::ServerMessageHandler* local, net::ClientMessageQueue* queue) { DEBUG4("SetLocal()"); diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.h b/ntcore/src/main/native/cpp/server/ServerImpl.h index 0cacafcb63..584df3f9ae 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.h +++ b/ntcore/src/main/native/cpp/server/ServerImpl.h @@ -43,6 +43,7 @@ class ServerImpl final { void SendAllOutgoing(uint64_t curTimeMs, bool flush); void SendOutgoing(int clientId, uint64_t curTimeMs); + void SendAllLocalOutgoing(uint64_t curTimeMs); void SetLocal(net::ServerMessageHandler* local, net::ClientMessageQueue* queue);