[ntcore] Fix client flush behavior (#4903)

We need to ignore per-publisher send periods when flushing.

Also fix NT4 client to use flush async's (same as NT3 client).
This commit is contained in:
Peter Johnson
2023-01-04 23:36:26 -08:00
committed by GitHub
parent bfe346c76a
commit aec6f3d506
5 changed files with 25 additions and 24 deletions

View File

@@ -215,7 +215,7 @@ NCImpl3::NCImpl3(int inst, std::string_view id,
m_sendValuesTimer->timeout.connect([this] {
if (m_clientImpl) {
HandleLocal();
m_clientImpl->SendPeriodic(m_loop.Now().count());
m_clientImpl->SendPeriodic(m_loop.Now().count(), false);
}
});
@@ -224,7 +224,7 @@ NCImpl3::NCImpl3(int inst, std::string_view id,
m_flush->wakeup.connect([this] {
if (m_clientImpl) {
HandleLocal();
m_clientImpl->SendPeriodic(m_loop.Now().count());
m_clientImpl->SendPeriodic(m_loop.Now().count(), true);
}
});
m_flushAtomic = m_flush.get();
@@ -355,7 +355,7 @@ NCImpl4::NCImpl4(
m_sendValuesTimer->timeout.connect([this] {
if (m_clientImpl) {
HandleLocal();
m_clientImpl->SendValues(m_loop.Now().count());
m_clientImpl->SendValues(m_loop.Now().count(), false);
}
});
@@ -364,7 +364,7 @@ NCImpl4::NCImpl4(
m_flush->wakeup.connect([this] {
if (m_clientImpl) {
HandleLocal();
m_clientImpl->SendValues(m_loop.Now().count());
m_clientImpl->SendValues(m_loop.Now().count(), true);
}
});
m_flushAtomic = m_flush.get();
@@ -501,16 +501,15 @@ void NetworkClient::StopDSClient() {
}
void NetworkClient::FlushLocal() {
m_impl->m_loopRunner.ExecAsync([this](uv::Loop&) { m_impl->HandleLocal(); });
if (auto async = m_impl->m_flushLocalAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
void NetworkClient::Flush() {
m_impl->m_loopRunner.ExecAsync([this](uv::Loop&) {
m_impl->HandleLocal();
if (m_impl->m_clientImpl) {
m_impl->m_clientImpl->SendValues(m_impl->m_loop.Now().count());
}
});
if (auto async = m_impl->m_flushAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
class NetworkClient3::Impl final : public NCImpl3 {

View File

@@ -56,7 +56,7 @@ class CImpl : public ServerMessageHandler {
void ProcessIncomingBinary(std::span<const uint8_t> data);
void HandleLocal(std::vector<ClientMessage>&& msgs);
bool SendControl(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs, bool flush);
void SendInitialValues();
bool CheckNetworkReady();
@@ -237,7 +237,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) {
return true;
}
void CImpl::SendValues(uint64_t curTimeMs) {
void CImpl::SendValues(uint64_t curTimeMs, bool flush) {
DEBUG4("SendValues({})", curTimeMs);
// can't send value updates until we have a RTT
@@ -254,7 +254,8 @@ void CImpl::SendValues(uint64_t curTimeMs) {
bool checkedNetwork = false;
auto writer = m_wire.SendBinary();
for (auto&& pub : m_publishers) {
if (pub && !pub->outValues.empty() && curTimeMs >= pub->nextSendMs) {
if (pub && !pub->outValues.empty() &&
(flush || curTimeMs >= pub->nextSendMs)) {
for (auto&& val : pub->outValues) {
if (!checkedNetwork) {
if (!CheckNetworkReady()) {
@@ -474,8 +475,8 @@ void ClientImpl::SendControl(uint64_t curTimeMs) {
m_impl->m_wire.Flush();
}
void ClientImpl::SendValues(uint64_t curTimeMs) {
m_impl->SendValues(curTimeMs);
void ClientImpl::SendValues(uint64_t curTimeMs, bool flush) {
m_impl->SendValues(curTimeMs, flush);
m_impl->m_wire.Flush();
}

View File

@@ -44,7 +44,7 @@ class ClientImpl {
void HandleLocal(std::vector<ClientMessage>&& msgs);
void SendControl(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs, bool flush);
void SetLocal(LocalInterface* local);
void SendInitial();

View File

@@ -91,7 +91,7 @@ class CImpl : public MessageHandler3 {
void ProcessIncoming(std::span<const uint8_t> data);
void HandleLocal(std::span<const net::ClientMessage> msgs);
void SendPeriodic(uint64_t curTimeMs, bool initial);
void SendPeriodic(uint64_t curTimeMs, bool initial, bool flush);
void SendValue(Writer& out, Entry* entry, const Value& value);
bool CheckNetworkReady();
@@ -223,7 +223,7 @@ void CImpl::HandleLocal(std::span<const net::ClientMessage> msgs) {
}
}
void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial) {
void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) {
DEBUG4("SendPeriodic({})", curTimeMs);
// rate limit sends
@@ -258,7 +258,8 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial) {
// send any pending updates due to be sent
bool checkedNetwork = false;
for (auto&& pub : m_publishers) {
if (pub && !pub->outValues.empty() && curTimeMs >= pub->nextSendMs) {
if (pub && !pub->outValues.empty() &&
(flush || curTimeMs >= pub->nextSendMs)) {
if (!checkedNetwork) {
if (!CheckNetworkReady()) {
return;
@@ -420,7 +421,7 @@ void CImpl::ServerHelloDone() {
}
// send initial assignments
SendPeriodic(m_initTimeMs, true);
SendPeriodic(m_initTimeMs, true, true);
m_state = kStateRunning;
m_setPeriodic(m_periodMs);
@@ -633,8 +634,8 @@ void ClientImpl3::HandleLocal(std::span<const net::ClientMessage> msgs) {
m_impl->HandleLocal(msgs);
}
void ClientImpl3::SendPeriodic(uint64_t curTimeMs) {
m_impl->SendPeriodic(curTimeMs, false);
void ClientImpl3::SendPeriodic(uint64_t curTimeMs, bool flush) {
m_impl->SendPeriodic(curTimeMs, false, flush);
}
void ClientImpl3::SetLocal(net::LocalInterface* local) {

View File

@@ -38,7 +38,7 @@ class ClientImpl3 {
void ProcessIncoming(std::span<const uint8_t> data);
void HandleLocal(std::span<const net::ClientMessage> msgs);
void SendPeriodic(uint64_t curTimeMs);
void SendPeriodic(uint64_t curTimeMs, bool flush);
void SetLocal(net::LocalInterface* local);