diff --git a/ntcore/src/main/native/cpp/NetworkClient.cpp b/ntcore/src/main/native/cpp/NetworkClient.cpp index 348ed2b391..da04dcac85 100644 --- a/ntcore/src/main/native/cpp/NetworkClient.cpp +++ b/ntcore/src/main/native/cpp/NetworkClient.cpp @@ -43,8 +43,6 @@ NetworkClientBase::NetworkClientBase(int inst, std::string_view id, m_id{id}, m_localQueue{logger}, m_loop{*m_loopRunner.GetLoop()} { - m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize); - INFO("starting network client"); } @@ -194,9 +192,14 @@ NetworkClient3::~NetworkClient3() { } void NetworkClient3::HandleLocal() { - m_localQueue.ReadQueue(&m_localMsgs); - if (m_clientImpl) { - m_clientImpl->HandleLocal(m_localMsgs); + for (;;) { + auto msgs = m_localQueue.ReadQueue(m_localMsgs); + if (msgs.empty()) { + return; + } + if (m_clientImpl) { + m_clientImpl->HandleLocal(msgs); + } } } @@ -358,9 +361,14 @@ NetworkClient::~NetworkClient() { } void NetworkClient::HandleLocal() { - m_localQueue.ReadQueue(&m_localMsgs); - if (m_clientImpl) { - m_clientImpl->HandleLocal(std::move(m_localMsgs)); + for (;;) { + auto msgs = m_localQueue.ReadQueue(m_localMsgs); + if (msgs.empty()) { + return; + } + if (m_clientImpl) { + m_clientImpl->HandleLocal(msgs); + } } } diff --git a/ntcore/src/main/native/cpp/NetworkClient.h b/ntcore/src/main/native/cpp/NetworkClient.h index 73e113415f..77da5dd8aa 100644 --- a/ntcore/src/main/native/cpp/NetworkClient.h +++ b/ntcore/src/main/native/cpp/NetworkClient.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include @@ -23,12 +22,11 @@ #include "INetworkClient.h" #include "net/ClientImpl.h" +#include "net/ClientMessageQueue.h" #include "net/Message.h" -#include "net/NetworkLoopQueue.h" #include "net/WebSocketConnection.h" #include "net3/ClientImpl3.h" #include "net3/UvStreamConnection3.h" -#include "ntcore_cpp.h" namespace wpi { class Logger; @@ -80,7 +78,8 @@ class NetworkClientBase : public INetworkClient { std::shared_ptr> m_flushLocal; std::shared_ptr> m_flush; - std::vector m_localMsgs; + using Queue = net::LocalClientMessageQueue; + net::ClientMessage m_localMsgs[Queue::kBlockSize]; std::vector> m_servers; @@ -91,7 +90,7 @@ class NetworkClientBase : public INetworkClient { std::atomic*> m_flushLocalAtomic{nullptr}; std::atomic*> m_flushAtomic{nullptr}; - net::NetworkLoopQueue m_localQueue; + Queue m_localQueue; int m_connHandle = 0; diff --git a/ntcore/src/main/native/cpp/NetworkServer.cpp b/ntcore/src/main/native/cpp/NetworkServer.cpp index 55490bb5da..8fb419a12b 100644 --- a/ntcore/src/main/native/cpp/NetworkServer.cpp +++ b/ntcore/src/main/native/cpp/NetworkServer.cpp @@ -42,6 +42,8 @@ namespace uv = wpi::uv; // use a larger max message size for websockets static constexpr size_t kMaxMessageSize = 2 * 1024 * 1024; +static constexpr size_t kClientProcessMessageCountMax = 16; + class NetworkServer::ServerConnection { public: ServerConnection(NetworkServer& server, std::string_view addr, @@ -105,7 +107,6 @@ class NetworkServer::ServerConnection4 final void NetworkServer::ServerConnection::SetupOutgoingTimer() { m_outgoingTimer = uv::Timer::Create(m_server.m_loop); m_outgoingTimer->timeout.connect([this] { - m_server.HandleLocal(); m_server.m_serverImpl.SendOutgoing(m_clientId, m_server.m_loop.Now().count()); }); @@ -172,8 +173,10 @@ NetworkServer::ServerConnection3::ServerConnection3( ConnectionClosed(); }); stream->data.connect([this](uv::Buffer& buf, size_t size) { - m_server.m_serverImpl.ProcessIncomingBinary( - m_clientId, {reinterpret_cast(buf.base), size}); + if (m_server.m_serverImpl.ProcessIncomingBinary( + m_clientId, {reinterpret_cast(buf.base), size})) { + m_server.m_idle->Start(); + } }); stream->StartRead(); @@ -293,10 +296,14 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() { ConnectionClosed(); }); m_websocket->text.connect([this](std::string_view data, bool) { - m_server.m_serverImpl.ProcessIncomingText(m_clientId, data); + if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) { + m_server.m_idle->Start(); + } }); m_websocket->binary.connect([this](std::span data, bool) { - m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data); + if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) { + m_server.m_idle->Start(); + } }); SetupOutgoingTimer(); @@ -320,12 +327,11 @@ NetworkServer::NetworkServer(std::string_view persistentFilename, m_serverImpl{logger}, m_localQueue{logger}, m_loop(*m_loopRunner.GetLoop()) { - m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize); m_loopRunner.ExecAsync([=, this](uv::Loop& loop) { // connect local storage to server - m_serverImpl.SetLocal(&m_localStorage); + m_serverImpl.SetLocal(&m_localStorage, &m_localQueue); m_localStorage.StartNetwork(&m_localQueue); - HandleLocal(); + ProcessAllLocal(); // load persistent file first, then initialize uv::QueueWork(m_loop, [this] { LoadPersistent(); }, [this] { Init(); }); @@ -350,9 +356,9 @@ void NetworkServer::Flush() { } } -void NetworkServer::HandleLocal() { - m_localQueue.ReadQueue(&m_localMsgs); - m_serverImpl.HandleLocal(m_localMsgs); +void NetworkServer::ProcessAllLocal() { + while (m_serverImpl.ProcessLocalMessages(128)) { + } } void NetworkServer::LoadPersistent() { @@ -421,8 +427,10 @@ void NetworkServer::Init() { m_readLocalTimer = uv::Timer::Create(m_loop); if (m_readLocalTimer) { m_readLocalTimer->timeout.connect([this] { - HandleLocal(); - m_serverImpl.SendAllOutgoing(m_loop.Now().count(), false); + if (m_serverImpl.ProcessLocalMessages(kClientProcessMessageCountMax)) { + DEBUG4("Starting idle processing"); + m_idle->Start(); // more to process + } }); m_readLocalTimer->Start(uv::Timer::Time{100}, uv::Timer::Time{100}); } @@ -447,7 +455,7 @@ void NetworkServer::Init() { m_flush = uv::Async<>::Create(m_loop); if (m_flush) { m_flush->wakeup.connect([this] { - HandleLocal(); + ProcessAllLocal(); m_serverImpl.SendAllOutgoing(m_loop.Now().count(), true); }); } @@ -455,10 +463,28 @@ void NetworkServer::Init() { m_flushLocal = uv::Async<>::Create(m_loop); if (m_flushLocal) { - m_flushLocal->wakeup.connect([this] { HandleLocal(); }); + m_flushLocal->wakeup.connect([this] { + if (m_serverImpl.ProcessLocalMessages(kClientProcessMessageCountMax)) { + DEBUG4("Starting idle processing"); + m_idle->Start(); // more to process + } + }); } m_flushLocalAtomic = m_flushLocal.get(); + m_idle = uv::Idle::Create(m_loop); + if (m_idle) { + m_idle->idle.connect([this] { + if (m_serverImpl.ProcessIncomingMessages(kClientProcessMessageCountMax)) { + DEBUG4("Starting idle processing"); + m_idle->Start(); // more to process + } else { + DEBUG4("Stopping idle processing"); + m_idle->Stop(); // go back to sleep + } + }); + } + INFO("Listening on NT3 port {}, NT4 port {}", m_port3, m_port4); if (m_port3 != 0) { diff --git a/ntcore/src/main/native/cpp/NetworkServer.h b/ntcore/src/main/native/cpp/NetworkServer.h index 3f5a0947c7..db4ecfac05 100644 --- a/ntcore/src/main/native/cpp/NetworkServer.h +++ b/ntcore/src/main/native/cpp/NetworkServer.h @@ -13,10 +13,11 @@ #include #include +#include #include +#include "net/ClientMessageQueue.h" #include "net/Message.h" -#include "net/NetworkLoopQueue.h" #include "net/ServerImpl.h" #include "ntcore_cpp.h" @@ -49,7 +50,7 @@ class NetworkServer { class ServerConnection3; class ServerConnection4; - void HandleLocal(); + void ProcessAllLocal(); void LoadPersistent(); void SavePersistent(std::string_view filename, std::string_view data); void Init(); @@ -71,9 +72,11 @@ class NetworkServer { std::shared_ptr m_savePersistentTimer; std::shared_ptr> m_flushLocal; std::shared_ptr> m_flush; + std::shared_ptr m_idle; bool m_shutdown = false; - std::vector m_localMsgs; + using Queue = net::LocalClientMessageQueue; + net::ClientMessage m_localMsgs[Queue::kBlockSize]; net::ServerImpl m_serverImpl; @@ -87,7 +90,7 @@ class NetworkServer { }; std::vector m_connections; - net::NetworkLoopQueue m_localQueue; + Queue m_localQueue; wpi::EventLoopRunner m_loopRunner; wpi::uv::Loop& m_loop; diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.cpp b/ntcore/src/main/native/cpp/net/ClientImpl.cpp index ff5408e722..13b01156cc 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ClientImpl.cpp @@ -99,7 +99,7 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs, } } -void ClientImpl::HandleLocal(std::vector&& msgs) { +void ClientImpl::HandleLocal(std::span msgs) { DEBUG4("HandleLocal()"); for (auto&& elem : msgs) { // common case is value diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.h b/ntcore/src/main/native/cpp/net/ClientImpl.h index 95a5b7e68f..9e3a189d1a 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.h +++ b/ntcore/src/main/native/cpp/net/ClientImpl.h @@ -45,7 +45,7 @@ class ClientImpl final : private ServerMessageHandler { void ProcessIncomingText(std::string_view data); void ProcessIncomingBinary(uint64_t curTimeMs, std::span data); - void HandleLocal(std::vector&& msgs); + void HandleLocal(std::span msgs); void SendOutgoing(uint64_t curTimeMs, bool flush); diff --git a/ntcore/src/main/native/cpp/net/ClientMessageQueue.h b/ntcore/src/main/native/cpp/net/ClientMessageQueue.h new file mode 100644 index 0000000000..c2e6f950a8 --- /dev/null +++ b/ntcore/src/main/native/cpp/net/ClientMessageQueue.h @@ -0,0 +1,87 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +#include +#include + +#include "Message.h" +#include "MessageHandler.h" + +namespace wpi { +class Logger; +} // namespace wpi + +namespace nt::net { + +class ClientMessageQueue { + public: + virtual ~ClientMessageQueue() = default; + + virtual std::span ReadQueue(std::span out) = 0; + virtual void ClearQueue() = 0; +}; + +namespace detail { + +template +class ClientMessageQueueImpl final : public ClientMessageHandler, + public ClientMessageQueue { + public: + static constexpr size_t kBlockSize = 64; + + explicit ClientMessageQueueImpl(wpi::Logger& logger) : m_logger{logger} {} + + bool empty() const { return m_queue.empty(); } + + // ClientMessageQueue - calls to these read the queue + std::span ReadQueue(std::span out) final; + void ClearQueue() final; + + // ClientMessageHandler - calls to these append to the queue + void ClientPublish(int pubuid, std::string_view name, + std::string_view typeStr, const wpi::json& properties, + const PubSubOptionsImpl& options) final; + void ClientUnpublish(int pubuid) final; + void ClientSetProperties(std::string_view name, + const wpi::json& update) final; + void ClientSubscribe(int subuid, std::span topicNames, + const PubSubOptionsImpl& options) final; + void ClientUnsubscribe(int subuid) final; + void ClientSetValue(int pubuid, const Value& value) final; + + private: + wpi::FastQueue m_queue{kBlockSize - 1}; + wpi::Logger& m_logger; + + class NoMutex { + public: + void lock() {} + void unlock() {} + }; + [[no_unique_address]] + std::conditional_t m_mutex; + + struct ValueSize { + size_t size{0}; + bool errored{false}; + }; + struct Empty {}; + [[no_unique_address]] + std::conditional_t m_valueSize; +}; + +} // namespace detail + +using LocalClientMessageQueue = + detail::ClientMessageQueueImpl<2 * 1024 * 1024, true>; +using NetworkIncomingClientQueue = detail::ClientMessageQueueImpl<0, false>; + +} // namespace nt::net + +#include "ClientMessageQueue.inc" diff --git a/ntcore/src/main/native/cpp/net/ClientMessageQueue.inc b/ntcore/src/main/native/cpp/net/ClientMessageQueue.inc new file mode 100644 index 0000000000..fd2c5f13a7 --- /dev/null +++ b/ntcore/src/main/native/cpp/net/ClientMessageQueue.inc @@ -0,0 +1,106 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +#include + +#include "ClientMessageQueue.h" + +namespace nt::net::detail { + +template +inline void ClientMessageQueueImpl::ClientPublish( + int pubuid, std::string_view name, std::string_view typeStr, + const wpi::json& properties, const PubSubOptionsImpl& options) { + std::scoped_lock lock{m_mutex}; + m_queue.enqueue(ClientMessage{PublishMsg{ + pubuid, std::string{name}, std::string{typeStr}, properties, options}}); +} + +template +inline void ClientMessageQueueImpl::ClientUnpublish( + int pubuid) { + std::scoped_lock lock{m_mutex}; + m_queue.enqueue(ClientMessage{UnpublishMsg{pubuid}}); +} + +template +inline void +ClientMessageQueueImpl::ClientSetProperties( + std::string_view name, const wpi::json& update) { + std::scoped_lock lock{m_mutex}; + m_queue.enqueue(ClientMessage{SetPropertiesMsg{std::string{name}, update}}); +} + +template +inline void ClientMessageQueueImpl::ClientSubscribe( + int subuid, std::span topicNames, + const PubSubOptionsImpl& options) { + std::scoped_lock lock{m_mutex}; + m_queue.enqueue(ClientMessage{ + SubscribeMsg{subuid, {topicNames.begin(), topicNames.end()}, options}}); +} + +template +inline void ClientMessageQueueImpl::ClientUnsubscribe( + int subuid) { + std::scoped_lock lock{m_mutex}; + m_queue.enqueue(ClientMessage{UnsubscribeMsg{subuid}}); +} + +template +std::span +ClientMessageQueueImpl::ReadQueue( + std::span out) { + std::scoped_lock lock{m_mutex}; + size_t count = 0; + for (auto&& msg : out) { + if (!m_queue.try_dequeue(msg)) { + break; + } + if constexpr (MaxValueSize != 0) { + if (auto* val = std::get_if(&msg.contents)) { + m_valueSize.size -= sizeof(ClientMessage) + val->value.size(); + m_valueSize.errored = false; + } + } + ++count; + } + return out.subspan(0, count); +} + +template +void ClientMessageQueueImpl::ClearQueue() { + std::scoped_lock lock{m_mutex}; + ClientMessage msg; + while (m_queue.try_dequeue(msg)) { + } + if constexpr (MaxValueSize != 0) { + m_valueSize.size = 0; + m_valueSize.errored = false; + } +} + +template +void ClientMessageQueueImpl::ClientSetValue( + int pubuid, const Value& value) { + std::scoped_lock lock{m_mutex}; + if constexpr (MaxValueSize != 0) { + m_valueSize.size += sizeof(ClientMessage) + value.size(); + if (m_valueSize.size > MaxValueSize) { + if (!m_valueSize.errored) { + WPI_ERROR(m_logger, "NT: dropping value set due to memory limits"); + m_valueSize.errored = true; + } + return; // avoid potential out of memory + } + } + m_queue.enqueue(ClientMessage{ClientValueMsg{pubuid, value}}); +} + +} // namespace nt::net::detail diff --git a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.cpp b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.cpp deleted file mode 100644 index cb7f05e09f..0000000000 --- a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.cpp +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) FIRST and other WPILib contributors. -// Open Source Software; you can modify and/or share it under the terms of -// the WPILib BSD license file in the root directory of this project. - -#include "NetworkLoopQueue.h" - -#include - -using namespace nt::net; - -static constexpr size_t kMaxSize = 2 * 1024 * 1024; - -void NetworkLoopQueue::ClientSetValue(int pubuid, const Value& value) { - std::scoped_lock lock{m_mutex}; - m_size += sizeof(ClientMessage) + value.size(); - if (m_size > kMaxSize) { - if (!m_sizeErrored) { - WPI_ERROR(m_logger, "NT: dropping value set due to memory limits"); - m_sizeErrored = true; - } - return; // avoid potential out of memory - } - m_queue.emplace_back(ClientMessage{ClientValueMsg{pubuid, value}}); -} diff --git a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.h b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.h deleted file mode 100644 index ca55ed96b4..0000000000 --- a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.h +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) FIRST and other WPILib contributors. -// Open Source Software; you can modify and/or share it under the terms of -// the WPILib BSD license file in the root directory of this project. - -#pragma once - -#include -#include -#include - -#include - -#include "Message.h" -#include "MessageHandler.h" - -namespace wpi { -class Logger; -} // namespace wpi - -namespace nt::net { - -class NetworkLoopQueue : public ClientMessageHandler { - public: - static constexpr size_t kInitialQueueSize = 2000; - - explicit NetworkLoopQueue(wpi::Logger& logger) : m_logger{logger} { - m_queue.reserve(kInitialQueueSize); - } - - void ReadQueue(std::vector* out); - void ClearQueue(); - - // ClientMessageHandler - calls to these append to the queue - void ClientPublish(int pubuid, std::string_view name, - std::string_view typeStr, const wpi::json& properties, - const PubSubOptionsImpl& options) final; - void ClientUnpublish(int pubuid) final; - void ClientSetProperties(std::string_view name, - const wpi::json& update) final; - void ClientSubscribe(int subuid, std::span topicNames, - const PubSubOptionsImpl& options) final; - void ClientUnsubscribe(int subuid) final; - void ClientSetValue(int pubuid, const Value& value) final; - - private: - wpi::mutex m_mutex; - std::vector m_queue; - wpi::Logger& m_logger; - size_t m_size{0}; - bool m_sizeErrored{false}; -}; - -} // namespace nt::net - -#include "NetworkLoopQueue.inc" diff --git a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.inc b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.inc deleted file mode 100644 index 1143e23d25..0000000000 --- a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.inc +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (c) FIRST and other WPILib contributors. -// Open Source Software; you can modify and/or share it under the terms of -// the WPILib BSD license file in the root directory of this project. - -#pragma once - -#include -#include -#include - -#include "NetworkLoopQueue.h" - -namespace nt::net { - -inline void NetworkLoopQueue::ReadQueue(std::vector* out) { - std::scoped_lock lock{m_mutex}; - out->swap(m_queue); - m_queue.resize(0); - m_queue.reserve(out->capacity()); // keep the same running capacity - m_size = 0; - m_sizeErrored = false; -} - -inline void NetworkLoopQueue::ClearQueue() { - std::scoped_lock lock{m_mutex}; - m_queue.resize(0); - m_size = 0; - m_sizeErrored = false; -} - -inline void NetworkLoopQueue::ClientPublish(int pubuid, std::string_view name, - std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) { - std::scoped_lock lock{m_mutex}; - m_queue.emplace_back(ClientMessage{PublishMsg{ - pubuid, std::string{name}, std::string{typeStr}, properties, options}}); -} - -inline void NetworkLoopQueue::ClientUnpublish(int pubuid) { - std::scoped_lock lock{m_mutex}; - m_queue.emplace_back(ClientMessage{UnpublishMsg{pubuid}}); -} - -inline void NetworkLoopQueue::ClientSetProperties(std::string_view name, - const wpi::json& update) { - std::scoped_lock lock{m_mutex}; - m_queue.emplace_back( - ClientMessage{SetPropertiesMsg{std::string{name}, update}}); -} - -inline void NetworkLoopQueue::ClientSubscribe( - int subuid, std::span topicNames, - const PubSubOptionsImpl& options) { - std::scoped_lock lock{m_mutex}; - m_queue.emplace_back(ClientMessage{ - SubscribeMsg{subuid, {topicNames.begin(), topicNames.end()}, options}}); -} - -inline void NetworkLoopQueue::ClientUnsubscribe(int subuid) { - std::scoped_lock lock{m_mutex}; - m_queue.emplace_back(ClientMessage{UnsubscribeMsg{subuid}}); -} - -} // namespace nt::net diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.cpp b/ntcore/src/main/native/cpp/net/ServerImpl.cpp index e24842d0ba..016929bbae 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ServerImpl.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,7 @@ #include "Log.h" #include "NetworkInterface.h" #include "Types_internal.h" +#include "net/Message.h" #include "net/WireEncoder.h" #include "net3/WireConnection3.h" #include "net3/WireEncoder3.h" @@ -432,12 +434,13 @@ void ServerImpl::ClientDataLocal::SendPropertiesUpdate(TopicData* topic, } } -void ServerImpl::ClientDataLocal::HandleLocal( - std::span msgs) { - DEBUG4("HandleLocal()"); - if (msgs.empty()) { - return; - } +bool ServerImpl::ClientData4Base::DoProcessIncomingMessages( + ClientMessageQueue& queue, size_t max) { + DEBUG4("ProcessIncomingMessage()"); + max = (std::min)(m_msgsBuf.size(), max); + std::span msgs = + queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max)); + // just map as a normal client into client=0 calls bool updatepub = false; bool updatesub = false; @@ -468,17 +471,28 @@ void ServerImpl::ClientDataLocal::HandleLocal( if (updatesub) { UpdateMetaClientSub(); } + + return msgs.size() == max; // don't know for sure, but there might be more } -void ServerImpl::ClientData4::ProcessIncomingText(std::string_view data) { - if (WireDecodeText(data, *this, m_logger)) { - UpdateMetaClientPub(); - UpdateMetaClientSub(); +bool ServerImpl::ClientData4::ProcessIncomingText(std::string_view data) { + constexpr int kMaxImmProcessing = 10; + bool queueWasEmpty = m_incoming.empty(); + // can't directly process, because we don't know how big it is + WireDecodeText(data, m_incoming, m_logger); + if (queueWasEmpty && + DoProcessIncomingMessages(m_incoming, kMaxImmProcessing)) { + m_wire.StopRead(); + return true; } + return false; } -void ServerImpl::ClientData4::ProcessIncomingBinary( +bool ServerImpl::ClientData4::ProcessIncomingBinary( std::span data) { + constexpr int kMaxImmProcessing = 10; + // if we've already queued, keep queuing + int count = m_incoming.empty() ? 0 : kMaxImmProcessing; for (;;) { if (data.empty()) { break; @@ -503,8 +517,17 @@ void ServerImpl::ClientData4::ProcessIncomingBinary( } // handle value set - ClientSetValue(pubuid, value); + if (++count < kMaxImmProcessing) { + ClientSetValue(pubuid, value); + } else { + m_incoming.ClientSetValue(pubuid, value); + } } + if (count >= kMaxImmProcessing) { + m_wire.StopRead(); + return true; + } + return false; } void ServerImpl::ClientData4::SendValue(TopicData* topic, const Value& value, @@ -608,11 +631,12 @@ bool ServerImpl::ClientData3::TopicData3::UpdateFlags(TopicData* topic) { return updated; } -void ServerImpl::ClientData3::ProcessIncomingBinary( +bool ServerImpl::ClientData3::ProcessIncomingBinary( std::span data) { if (!m_decoder.Execute(&data)) { m_wire.Disconnect(m_decoder.GetError()); } + return false; } void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value, @@ -1918,14 +1942,11 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) { } } -void ServerImpl::HandleLocal(std::span msgs) { - // just map as a normal client into client=0 calls - m_localClient->HandleLocal(msgs); -} - -void ServerImpl::SetLocal(ServerMessageHandler* local) { +void ServerImpl::SetLocal(ServerMessageHandler* local, + ClientMessageQueue* queue) { DEBUG4("SetLocal()"); m_local = local; + m_localClient->SetQueue(queue); // create server meta topics m_metaClients = CreateMetaTopic("$clients"); @@ -1939,19 +1960,39 @@ void ServerImpl::SetLocal(ServerMessageHandler* local) { m_localClient->UpdateMetaClientSub(); } -void ServerImpl::ProcessIncomingText(int clientId, std::string_view data) { +bool ServerImpl::ProcessIncomingText(int clientId, std::string_view data) { if (auto client = m_clients[clientId].get()) { - client->ProcessIncomingText(data); + return client->ProcessIncomingText(data); + } else { + return false; } } -void ServerImpl::ProcessIncomingBinary(int clientId, +bool ServerImpl::ProcessIncomingBinary(int clientId, std::span data) { if (auto client = m_clients[clientId].get()) { - client->ProcessIncomingBinary(data); + return client->ProcessIncomingBinary(data); + } else { + return false; } } +bool ServerImpl::ProcessIncomingMessages(size_t max) { + DEBUG4("ProcessIncomingMessages({})", max); + bool rv = false; + for (auto&& client : m_clients) { + if (client && client->ProcessIncomingMessages(max)) { + rv = true; + } + } + return rv; +} + +bool ServerImpl::ProcessLocalMessages(size_t max) { + DEBUG4("ProcessLocalMessages({})", max); + return m_localClient->ProcessIncomingMessages(max); +} + void ServerImpl::ConnectionsChanged(const std::vector& conns) { UpdateMetaClients(conns); } diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.h b/ntcore/src/main/native/cpp/net/ServerImpl.h index 01e0058ec7..f4fae4113c 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.h +++ b/ntcore/src/main/native/cpp/net/ServerImpl.h @@ -21,9 +21,8 @@ #include #include -#include "Log.h" +#include "ClientMessageQueue.h" #include "Message.h" -#include "NetworkInterface.h" #include "NetworkOutgoingQueue.h" #include "NetworkPing.h" #include "PubSubOptions.h" @@ -63,11 +62,15 @@ class ServerImpl final { void SendAllOutgoing(uint64_t curTimeMs, bool flush); void SendOutgoing(int clientId, uint64_t curTimeMs); - void HandleLocal(std::span msgs); - void SetLocal(ServerMessageHandler* local); + void SetLocal(ServerMessageHandler* local, ClientMessageQueue* queue); - void ProcessIncomingText(int clientId, std::string_view data); - void ProcessIncomingBinary(int clientId, std::span data); + // these return true if any messages have been queued for later processing + bool ProcessIncomingText(int clientId, std::string_view data); + bool ProcessIncomingBinary(int clientId, std::span data); + + // later processing -- returns true if more to process + bool ProcessIncomingMessages(size_t max); + bool ProcessLocalMessages(size_t max); // Returns -1 if cannot add client (e.g. due to duplicate name). // Caller must ensure WireConnection lifetime lasts until RemoveClient() call. @@ -181,8 +184,9 @@ class ServerImpl final { m_logger{logger} {} virtual ~ClientData() = default; - virtual void ProcessIncomingText(std::string_view data) = 0; - virtual void ProcessIncomingBinary(std::span data) = 0; + // these return true if any messages have been queued for later processing + virtual bool ProcessIncomingText(std::string_view data) = 0; + virtual bool ProcessIncomingBinary(std::span data) = 0; virtual void SendValue(TopicData* topic, const Value& value, ValueSendMode mode) = 0; @@ -193,6 +197,9 @@ class ServerImpl final { virtual void SendOutgoing(uint64_t curTimeMs, bool flush) = 0; virtual void Flush() = 0; + // later processing -- returns true if more to process + virtual bool ProcessIncomingMessages(size_t max) = 0; + void UpdateMetaClientPub(); void UpdateMetaClientSub(); @@ -248,7 +255,12 @@ class ServerImpl final { void ClientSetValue(int pubuid, const Value& value) final; + bool DoProcessIncomingMessages(ClientMessageQueue& queue, size_t max); + wpi::DenseMap m_announceSent; + + private: + std::array m_msgsBuf; }; class ClientDataLocal final : public ClientData4Base { @@ -256,8 +268,17 @@ class ServerImpl final { ClientDataLocal(ServerImpl& server, int id, wpi::Logger& logger) : ClientData4Base{"", "", true, [](uint32_t) {}, server, id, logger} {} - void ProcessIncomingText(std::string_view data) final {} - void ProcessIncomingBinary(std::span data) final {} + bool ProcessIncomingText(std::string_view data) final { return false; } + bool ProcessIncomingBinary(std::span data) final { + return false; + } + + bool ProcessIncomingMessages(size_t max) final { + if (!m_queue) { + return false; + } + return DoProcessIncomingMessages(*m_queue, max); + } void SendValue(TopicData* topic, const Value& value, ValueSendMode mode) final; @@ -268,7 +289,10 @@ class ServerImpl final { void SendOutgoing(uint64_t curTimeMs, bool flush) final {} void Flush() final {} - void HandleLocal(std::span msgs); + void SetQueue(ClientMessageQueue* queue) { m_queue = queue; } + + private: + ClientMessageQueue* m_queue = nullptr; }; class ClientData4 final : public ClientData4Base { @@ -280,10 +304,19 @@ class ServerImpl final { server, id, logger}, m_wire{wire}, m_ping{wire}, + m_incoming{logger}, m_outgoing{wire, local} {} - void ProcessIncomingText(std::string_view data) final; - void ProcessIncomingBinary(std::span data) final; + bool ProcessIncomingText(std::string_view data) final; + bool ProcessIncomingBinary(std::span data) final; + + bool ProcessIncomingMessages(size_t max) final { + if (!DoProcessIncomingMessages(m_incoming, max)) { + m_wire.StartRead(); + return false; + } + return true; + } void SendValue(TopicData* topic, const Value& value, ValueSendMode mode) final; @@ -302,6 +335,7 @@ class ServerImpl final { private: NetworkPing m_ping; + NetworkIncomingClientQueue m_incoming; NetworkOutgoingQueue m_outgoing; }; @@ -315,10 +349,13 @@ class ServerImpl final { : ClientData{"", connInfo, local, setPeriodic, server, id, logger}, m_connected{std::move(connected)}, m_wire{wire}, - m_decoder{*this} {} + m_decoder{*this}, + m_incoming{logger} {} - void ProcessIncomingText(std::string_view data) final {} - void ProcessIncomingBinary(std::span data) final; + bool ProcessIncomingText(std::string_view data) final { return false; } + bool ProcessIncomingBinary(std::span data) final; + + bool ProcessIncomingMessages(size_t max) final { return false; } void SendValue(TopicData* topic, const Value& value, ValueSendMode mode) final; @@ -358,6 +395,7 @@ class ServerImpl final { State m_state{kStateInitial}; net3::WireDecoder3 m_decoder; + NetworkIncomingClientQueue m_incoming; std::vector m_outgoing; wpi::DenseMap m_outgoingValueMap; int64_t m_nextPubUid{1}; diff --git a/ntcore/src/test/native/cpp/net/MockClientMessageQueue.h b/ntcore/src/test/native/cpp/net/MockClientMessageQueue.h new file mode 100644 index 0000000000..99ac6cbce6 --- /dev/null +++ b/ntcore/src/test/native/cpp/net/MockClientMessageQueue.h @@ -0,0 +1,33 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include +#include + +#include "net/ClientMessageQueue.h" +#include "net/Message.h" + +namespace nt::net { + +class MockClientMessageQueue : public net::ClientMessageQueue { + public: + std::span ReadQueue(std::span out) override { + size_t size = out.size(); + if (size > msgs.size()) { + size = msgs.size(); + } + std::move(msgs.begin(), msgs.begin() + size, out.begin()); + msgs.erase(msgs.begin(), msgs.begin() + size); + return out.subspan(0, size); + } + + void ClearQueue() override { msgs.clear(); } + + std::vector msgs; +}; + +} // namespace nt::net diff --git a/ntcore/src/test/native/cpp/net/ServerImplTest.cpp b/ntcore/src/test/native/cpp/net/ServerImplTest.cpp index 064508ec6f..dce0fb4a38 100644 --- a/ntcore/src/test/native/cpp/net/ServerImplTest.cpp +++ b/ntcore/src/test/native/cpp/net/ServerImplTest.cpp @@ -18,6 +18,7 @@ #include "../TestPrinters.h" #include "../ValueMatcher.h" #include "Handle.h" +#include "MockClientMessageQueue.h" #include "MockMessageHandler.h" #include "MockWireConnection.h" #include "gmock/gmock.h" @@ -45,6 +46,7 @@ namespace nt { class ServerImplTest : public ::testing::Test { public: ::testing::StrictMock local; + ::testing::StrictMock queue; wpi::MockLogger logger; net::ServerImpl server{logger}; }; @@ -144,7 +146,7 @@ static std::vector EncodeServerBinary(const T& msgs) { TEST_F(ServerImplTest, PublishLocal) { // publish before client connect - server.SetLocal(&local); + server.SetLocal(&local, &queue); constexpr int pubuid = 1; constexpr int pubuid2 = 2; constexpr int pubuid3 = 3; @@ -165,10 +167,9 @@ TEST_F(ServerImplTest, PublishLocal) { } { - std::vector msgs; - msgs.emplace_back(net::ClientMessage{ + queue.msgs.emplace_back(net::ClientMessage{ net::PublishMsg{pubuid, "test", "double", wpi::json::object(), {}}}); - server.HandleLocal(msgs); + EXPECT_FALSE(server.ProcessLocalMessages(UINT_MAX)); } // client connect; it should get already-published topic as soon as it @@ -213,20 +214,18 @@ TEST_F(ServerImplTest, PublishLocal) { // publish before send control { - std::vector msgs; - msgs.emplace_back(net::ClientMessage{ + queue.msgs.emplace_back(net::ClientMessage{ net::PublishMsg{pubuid2, "test2", "double", wpi::json::object(), {}}}); - server.HandleLocal(msgs); + EXPECT_FALSE(server.ProcessLocalMessages(UINT_MAX)); } server.SendAllOutgoing(100, false); // publish after send control { - std::vector msgs; - msgs.emplace_back(net::ClientMessage{ + queue.msgs.emplace_back(net::ClientMessage{ net::PublishMsg{pubuid3, "test3", "double", wpi::json::object(), {}}}); - server.HandleLocal(msgs); + EXPECT_FALSE(server.ProcessLocalMessages(UINT_MAX)); } server.SendAllOutgoing(200, false); @@ -234,7 +233,7 @@ TEST_F(ServerImplTest, PublishLocal) { TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) { // publish before client connect - server.SetLocal(&local); + server.SetLocal(&local, &queue); constexpr int pubuid = 1; EXPECT_CALL( local, @@ -242,12 +241,11 @@ TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) { wpi::json::object(), std::optional{pubuid})); { - std::vector msgs; - msgs.emplace_back(net::ClientMessage{ + queue.msgs.emplace_back(net::ClientMessage{ net::PublishMsg{pubuid, "test", "double", wpi::json::object(), {}}}); - msgs.emplace_back(net::ClientMessage{ + queue.msgs.emplace_back(net::ClientMessage{ net::ClientValueMsg{pubuid, Value::MakeDouble(1.0, 10)}}); - server.HandleLocal(msgs); + EXPECT_FALSE(server.ProcessLocalMessages(UINT_MAX)); } ::testing::StrictMock wire; @@ -306,7 +304,7 @@ TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) { } TEST_F(ServerImplTest, ClientDisconnectUnpublish) { - server.SetLocal(&local); + server.SetLocal(&local, &queue); constexpr int pubuidLocal = 1; constexpr int subuid = 1; { @@ -323,19 +321,17 @@ TEST_F(ServerImplTest, ClientDisconnectUnpublish) { } { - std::vector msgs; - msgs.emplace_back(net::ClientMessage{net::PublishMsg{ + queue.msgs.emplace_back(net::ClientMessage{net::PublishMsg{ pubuidLocal, "test2", "double", wpi::json::object(), {}}}); - msgs.emplace_back(net::ClientMessage{ + queue.msgs.emplace_back(net::ClientMessage{ net::ClientValueMsg{pubuidLocal, Value::MakeDouble(1.0, 10)}}); - server.HandleLocal(msgs); + EXPECT_FALSE(server.ProcessLocalMessages(UINT_MAX)); } { - std::vector msgs; - msgs.emplace_back( + queue.msgs.emplace_back( net::ClientMessage{net::SubscribeMsg{subuid, {"test"}, {}}}); - server.HandleLocal(msgs); + EXPECT_FALSE(server.ProcessLocalMessages(UINT_MAX)); } ::testing::StrictMock wire; @@ -374,7 +370,7 @@ TEST_F(ServerImplTest, ClientDisconnectUnpublish) { TEST_F(ServerImplTest, ZeroTimestampNegativeTime) { // publish before client connect - server.SetLocal(&local); + server.SetLocal(&local, &queue); constexpr int pubuid = 1; NT_Topic topicHandle = nt::Handle{0, 1, nt::Handle::kTopic}; constexpr int subuid = 1; @@ -394,14 +390,13 @@ TEST_F(ServerImplTest, ZeroTimestampNegativeTime) { } { - std::vector msgs; - msgs.emplace_back(net::ClientMessage{ + queue.msgs.emplace_back(net::ClientMessage{ net::PublishMsg{pubuid, "test", "double", wpi::json::object(), {}}}); - msgs.emplace_back( + queue.msgs.emplace_back( net::ClientMessage{net::ClientValueMsg{pubuid, defaultValue}}); - msgs.emplace_back( + queue.msgs.emplace_back( net::ClientMessage{net::SubscribeMsg{subuid, {"test"}, {}}}); - server.HandleLocal(msgs); + EXPECT_FALSE(server.ProcessLocalMessages(UINT_MAX)); } // client connect; it should get already-published topic as soon as it