[ntcore] Minimize latency on localhost connections (#7997)

This commit is contained in:
Peter Johnson
2025-06-02 16:43:18 -07:00
committed by GitHub
parent 2af8c59858
commit f99692f287
9 changed files with 97 additions and 14 deletions

View File

@@ -30,6 +30,7 @@ void bench();
void bench2();
void stress();
void stress2();
void latency();
int main(int argc, char* argv[]) {
if (argc == 2 && std::string_view{argv[1]} == "bench") {
@@ -48,6 +49,10 @@ int main(int argc, char* argv[]) {
stress2();
return EXIT_SUCCESS;
}
if (argc == 2 && std::string_view{argv[1]} == "latency") {
latency();
return EXIT_SUCCESS;
}
auto myValue = nt::GetEntry(nt::GetDefaultInstance(), "MyValue");
@@ -325,3 +330,71 @@ void stress2() {
std::this_thread::sleep_for(10s);
fmt::print("isDone: {}", isDone.load());
}
void latency() {
// set up instances
auto client1 = nt::CreateInstance();
auto client2 = nt::CreateInstance();
auto server = nt::CreateInstance();
// connect client and server
nt::StartServer(server, "latency.json", "127.0.0.1", 10000);
nt::StartClient(client1, "client1");
nt::SetServer(client1, "127.0.0.1", 10000);
nt::StartClient(client2, "client2");
nt::SetServer(client2, "127.0.0.1", 10000);
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
// create publishers and subscribers
auto pub =
nt::Publish(nt::GetTopic(client1, "highrate"), NT_DOUBLE, "double");
nt::SubscribeMultiple(server, {{std::string_view{}}});
auto sub =
nt::Subscribe(nt::GetTopic(server, "highrate"), NT_DOUBLE, "double");
auto sub2 =
nt::Subscribe(nt::GetTopic(client2, "highrate"), NT_DOUBLE, "double");
std::this_thread::sleep_for(1s);
nt::SetDouble(pub, 0);
#if 0
// warm up
for (int i = 1; i <= 10000; ++i) {
nt::SetDouble(pub, i * 0.01);
if (i % 2000 == 0) {
std::this_thread::sleep_for(0.02s);
}
}
#endif
std::vector<int64_t> times;
times.reserve(1001);
// benchmark client to server
for (int i = 1; i <= 1000; ++i) {
int64_t sendTime = nt::Now();
nt::SetDouble(pub, i, sendTime);
nt::Flush(client1);
while (nt::GetDouble(sub, 0) != i) {
wpi::WaitForObject(sub);
}
times.emplace_back(nt::Now() - sendTime);
}
PrintTimes(times);
// benchmark client to client
times.resize(0);
for (int i = 2001; i <= 3000; ++i) {
int64_t sendTime = nt::Now();
nt::SetDouble(pub, i, sendTime);
nt::Flush(client1);
while (nt::GetDouble(sub2, 0) != i) {
wpi::WaitForObject(sub2);
}
times.emplace_back(nt::Now() - sendTime);
}
PrintTimes(times);
}

View File

@@ -258,10 +258,12 @@ void NetworkClient::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp,
INFO("CONNECTED NT4 to {} port {}", connInfo.remote_ip, connInfo.remote_port);
m_connHandle = m_connList.AddConnection(connInfo);
bool local = wpi::starts_with(connInfo.remote_ip, "127.");
m_wire = std::make_shared<net::WebSocketConnection>(
ws, connInfo.protocol_version, m_logger);
m_clientImpl = std::make_unique<net::ClientImpl>(
m_loop.Now().count(), *m_wire, m_logger, m_timeSyncUpdated,
m_loop.Now().count(), *m_wire, local, m_logger, m_timeSyncUpdated,
[this](uint32_t repeatMs) {
DEBUG4("Setting periodic timer to {}", repeatMs);
if (m_sendOutgoingTimer &&

View File

@@ -60,6 +60,7 @@ class NetworkServer::ServerConnection {
void SetupOutgoingTimer();
void UpdateOutgoingTimer(uint32_t repeatMs);
void ConnectionClosed();
uv::Loop& GetLoopRef() const { return m_outgoingTimer->GetLoopRef(); }
NetworkServer& m_server;
ConnectionInfo m_info;
@@ -221,10 +222,10 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
return;
}
// TODO: set local flag appropriately
bool local = wpi::starts_with(m_info.remote_ip, "127.");
std::string dedupName;
std::tie(dedupName, m_clientId) = m_server.m_serverImpl.AddClient(
name, m_connInfo, false, *m_wire,
name, m_connInfo, local, *m_wire,
[this](uint32_t repeatMs) { UpdateOutgoingTimer(repeatMs); });
INFO("CONNECTED NT4 client '{}' (from {})", dedupName, m_connInfo);
m_info.remote_id = dedupName;
@@ -239,11 +240,13 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) {
m_server.m_idle->Start();
}
m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count());
});
m_websocket->binary.connect([this](std::span<const uint8_t> data, bool) {
if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) {
m_server.m_idle->Start();
}
m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count());
});
SetupOutgoingTimer();
@@ -420,6 +423,7 @@ void NetworkServer::Init() {
DEBUG4("Stopping idle processing");
m_idle->Stop(); // go back to sleep
}
m_serverImpl.SendAllLocalOutgoing(m_loop.Now().count());
});
}

View File

@@ -28,7 +28,7 @@ using namespace nt;
using namespace nt::net;
ClientImpl::ClientImpl(
uint64_t curTimeMs, WireConnection& wire, wpi::Logger& logger,
uint64_t curTimeMs, WireConnection& wire, bool local, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic)
@@ -40,7 +40,7 @@ ClientImpl::ClientImpl(
m_nextPingTimeMs{curTimeMs + (wire.GetVersion() >= 0x0401
? NetworkPing::kPingIntervalMs
: kRttIntervalMs)},
m_outgoing{wire, false} {
m_outgoing{wire, local} {
// immediately send RTT ping
auto now = wpi::Now();
DEBUG4("Sending initial RTT ping {}", now);

View File

@@ -38,7 +38,7 @@ class WireConnection;
class ClientImpl final : private ServerMessageHandler {
public:
ClientImpl(
uint64_t curTimeMs, WireConnection& wire, wpi::Logger& logger,
uint64_t curTimeMs, WireConnection& wire, bool local, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic);

View File

@@ -59,7 +59,7 @@ concept NetworkMessage =
std::same_as<typename MessageType::ValueMsg, ServerValueMsg> ||
std::same_as<typename MessageType::ValueMsg, ClientValueMsg>;
enum class ValueSendMode { kDisabled = 0, kAll, kNormal, kImm };
enum class ValueSendMode { kDisabled = 0, kAll, kNormal };
template <NetworkMessage MessageType>
class NetworkOutgoingQueue {
@@ -116,9 +116,6 @@ class NetworkOutgoingQueue {
}
void SendValue(int id, const Value& value, ValueSendMode mode) {
if (m_local) {
mode = ValueSendMode::kImm; // always send local immediately
}
// backpressure by stopping sending all if the buffer is too full
if (mode == ValueSendMode::kAll && m_totalSize >= kOutgoingLimit) {
mode = ValueSendMode::kNormal;
@@ -126,9 +123,6 @@ class NetworkOutgoingQueue {
switch (mode) {
case ValueSendMode::kDisabled: // do nothing
break;
case ValueSendMode::kImm: // send immediately
m_wire.SendBinary([&](auto& os) { EncodeValue(os, id, value); });
break;
case ValueSendMode::kAll: { // append to outgoing
auto& info = m_idMap[id];
auto& queue = m_queues[info.queueIndex];
@@ -169,7 +163,7 @@ class NetworkOutgoingQueue {
}
// rate limit frequency of transmissions
if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) {
if (!m_local && curTimeMs < (m_lastSendMs + kMinPeriodMs)) {
return;
}

View File

@@ -73,6 +73,7 @@ class ServerClient {
std::string_view GetName() const { return m_name; }
int GetId() const { return m_id; }
bool IsLocal() const { return m_local; }
virtual void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) {}

View File

@@ -153,6 +153,14 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) {
}
}
void ServerImpl::SendAllLocalOutgoing(uint64_t curTimeMs) {
for (auto&& client : m_clients) {
if (client && client->IsLocal()) {
client->SendOutgoing(curTimeMs, true);
}
}
}
void ServerImpl::SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue) {
DEBUG4("SetLocal()");

View File

@@ -43,6 +43,7 @@ class ServerImpl final {
void SendAllOutgoing(uint64_t curTimeMs, bool flush);
void SendOutgoing(int clientId, uint64_t curTimeMs);
void SendAllLocalOutgoing(uint64_t curTimeMs);
void SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue);