From aec6f3d5065af8fc604fe6cd6afdfa57a8de3b3f Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Wed, 4 Jan 2023 23:36:26 -0800 Subject: [PATCH] [ntcore] Fix client flush behavior (#4903) We need to ignore per-publisher send periods when flushing. Also fix NT4 client to use flush async's (same as NT3 client). --- ntcore/src/main/native/cpp/NetworkClient.cpp | 21 +++++++++---------- ntcore/src/main/native/cpp/net/ClientImpl.cpp | 11 +++++----- ntcore/src/main/native/cpp/net/ClientImpl.h | 2 +- .../src/main/native/cpp/net3/ClientImpl3.cpp | 13 ++++++------ ntcore/src/main/native/cpp/net3/ClientImpl3.h | 2 +- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/ntcore/src/main/native/cpp/NetworkClient.cpp b/ntcore/src/main/native/cpp/NetworkClient.cpp index a329fb9dd9..56d98a84bc 100644 --- a/ntcore/src/main/native/cpp/NetworkClient.cpp +++ b/ntcore/src/main/native/cpp/NetworkClient.cpp @@ -215,7 +215,7 @@ NCImpl3::NCImpl3(int inst, std::string_view id, m_sendValuesTimer->timeout.connect([this] { if (m_clientImpl) { HandleLocal(); - m_clientImpl->SendPeriodic(m_loop.Now().count()); + m_clientImpl->SendPeriodic(m_loop.Now().count(), false); } }); @@ -224,7 +224,7 @@ NCImpl3::NCImpl3(int inst, std::string_view id, m_flush->wakeup.connect([this] { if (m_clientImpl) { HandleLocal(); - m_clientImpl->SendPeriodic(m_loop.Now().count()); + m_clientImpl->SendPeriodic(m_loop.Now().count(), true); } }); m_flushAtomic = m_flush.get(); @@ -355,7 +355,7 @@ NCImpl4::NCImpl4( m_sendValuesTimer->timeout.connect([this] { if (m_clientImpl) { HandleLocal(); - m_clientImpl->SendValues(m_loop.Now().count()); + m_clientImpl->SendValues(m_loop.Now().count(), false); } }); @@ -364,7 +364,7 @@ NCImpl4::NCImpl4( m_flush->wakeup.connect([this] { if (m_clientImpl) { HandleLocal(); - m_clientImpl->SendValues(m_loop.Now().count()); + m_clientImpl->SendValues(m_loop.Now().count(), true); } }); m_flushAtomic = m_flush.get(); @@ -501,16 +501,15 @@ void NetworkClient::StopDSClient() { } void NetworkClient::FlushLocal() { - m_impl->m_loopRunner.ExecAsync([this](uv::Loop&) { m_impl->HandleLocal(); }); + if (auto async = m_impl->m_flushLocalAtomic.load(std::memory_order_relaxed)) { + async->UnsafeSend(); + } } void NetworkClient::Flush() { - m_impl->m_loopRunner.ExecAsync([this](uv::Loop&) { - m_impl->HandleLocal(); - if (m_impl->m_clientImpl) { - m_impl->m_clientImpl->SendValues(m_impl->m_loop.Now().count()); - } - }); + if (auto async = m_impl->m_flushAtomic.load(std::memory_order_relaxed)) { + async->UnsafeSend(); + } } class NetworkClient3::Impl final : public NCImpl3 { diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.cpp b/ntcore/src/main/native/cpp/net/ClientImpl.cpp index 2efbb29691..6484f33989 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ClientImpl.cpp @@ -56,7 +56,7 @@ class CImpl : public ServerMessageHandler { void ProcessIncomingBinary(std::span data); void HandleLocal(std::vector&& msgs); bool SendControl(uint64_t curTimeMs); - void SendValues(uint64_t curTimeMs); + void SendValues(uint64_t curTimeMs, bool flush); void SendInitialValues(); bool CheckNetworkReady(); @@ -237,7 +237,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) { return true; } -void CImpl::SendValues(uint64_t curTimeMs) { +void CImpl::SendValues(uint64_t curTimeMs, bool flush) { DEBUG4("SendValues({})", curTimeMs); // can't send value updates until we have a RTT @@ -254,7 +254,8 @@ void CImpl::SendValues(uint64_t curTimeMs) { bool checkedNetwork = false; auto writer = m_wire.SendBinary(); for (auto&& pub : m_publishers) { - if (pub && !pub->outValues.empty() && curTimeMs >= pub->nextSendMs) { + if (pub && !pub->outValues.empty() && + (flush || curTimeMs >= pub->nextSendMs)) { for (auto&& val : pub->outValues) { if (!checkedNetwork) { if (!CheckNetworkReady()) { @@ -474,8 +475,8 @@ void ClientImpl::SendControl(uint64_t curTimeMs) { m_impl->m_wire.Flush(); } -void ClientImpl::SendValues(uint64_t curTimeMs) { - m_impl->SendValues(curTimeMs); +void ClientImpl::SendValues(uint64_t curTimeMs, bool flush) { + m_impl->SendValues(curTimeMs, flush); m_impl->m_wire.Flush(); } diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.h b/ntcore/src/main/native/cpp/net/ClientImpl.h index 0e7fd4a00d..98ed32870f 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.h +++ b/ntcore/src/main/native/cpp/net/ClientImpl.h @@ -44,7 +44,7 @@ class ClientImpl { void HandleLocal(std::vector&& msgs); void SendControl(uint64_t curTimeMs); - void SendValues(uint64_t curTimeMs); + void SendValues(uint64_t curTimeMs, bool flush); void SetLocal(LocalInterface* local); void SendInitial(); diff --git a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp index 07838659d7..39981d66a4 100644 --- a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp +++ b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp @@ -91,7 +91,7 @@ class CImpl : public MessageHandler3 { void ProcessIncoming(std::span data); void HandleLocal(std::span msgs); - void SendPeriodic(uint64_t curTimeMs, bool initial); + void SendPeriodic(uint64_t curTimeMs, bool initial, bool flush); void SendValue(Writer& out, Entry* entry, const Value& value); bool CheckNetworkReady(); @@ -223,7 +223,7 @@ void CImpl::HandleLocal(std::span msgs) { } } -void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial) { +void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) { DEBUG4("SendPeriodic({})", curTimeMs); // rate limit sends @@ -258,7 +258,8 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial) { // send any pending updates due to be sent bool checkedNetwork = false; for (auto&& pub : m_publishers) { - if (pub && !pub->outValues.empty() && curTimeMs >= pub->nextSendMs) { + if (pub && !pub->outValues.empty() && + (flush || curTimeMs >= pub->nextSendMs)) { if (!checkedNetwork) { if (!CheckNetworkReady()) { return; @@ -420,7 +421,7 @@ void CImpl::ServerHelloDone() { } // send initial assignments - SendPeriodic(m_initTimeMs, true); + SendPeriodic(m_initTimeMs, true, true); m_state = kStateRunning; m_setPeriodic(m_periodMs); @@ -633,8 +634,8 @@ void ClientImpl3::HandleLocal(std::span msgs) { m_impl->HandleLocal(msgs); } -void ClientImpl3::SendPeriodic(uint64_t curTimeMs) { - m_impl->SendPeriodic(curTimeMs, false); +void ClientImpl3::SendPeriodic(uint64_t curTimeMs, bool flush) { + m_impl->SendPeriodic(curTimeMs, false, flush); } void ClientImpl3::SetLocal(net::LocalInterface* local) { diff --git a/ntcore/src/main/native/cpp/net3/ClientImpl3.h b/ntcore/src/main/native/cpp/net3/ClientImpl3.h index 484ea3d0f5..3ea5ac7f36 100644 --- a/ntcore/src/main/native/cpp/net3/ClientImpl3.h +++ b/ntcore/src/main/native/cpp/net3/ClientImpl3.h @@ -38,7 +38,7 @@ class ClientImpl3 { void ProcessIncoming(std::span data); void HandleLocal(std::span msgs); - void SendPeriodic(uint64_t curTimeMs); + void SendPeriodic(uint64_t curTimeMs, bool flush); void SetLocal(net::LocalInterface* local);