diff --git a/ntcore/src/main/native/cpp/LocalStorage.cpp b/ntcore/src/main/native/cpp/LocalStorage.cpp index e954347da8..72298517ef 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.cpp +++ b/ntcore/src/main/native/cpp/LocalStorage.cpp @@ -20,6 +20,7 @@ #include "Log.h" #include "Types_internal.h" #include "Value_internal.h" +#include "net/MessageHandler.h" #include "networktables/NetworkTableValue.h" using namespace nt; @@ -403,7 +404,7 @@ void LocalStorage::Impl::PropertiesUpdated(TopicData* topic, NotifyTopic(topic, eventFlags | NT_EVENT_PROPERTIES); // check local flag so we don't echo back received properties changes if (m_network && sendNetwork) { - m_network->SetProperties(topic->name, update); + m_network->ClientSetProperties(topic->name, update); } } @@ -503,8 +504,9 @@ void LocalStorage::Impl::RemoveNetworkPublisher(TopicData* topic) { // this may result in a duplicate publish warning on the server side, // but send one anyway in this case just to be sure if (nextPub->active && m_network) { - m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name, - topic->typeStr, topic->properties, nextPub->config); + m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(), + topic->name, topic->typeStr, topic->properties, + nextPub->config); } } } @@ -561,8 +563,8 @@ LocalStorage::PublisherData* LocalStorage::Impl::AddLocalPublisher( } if (publisher->active && m_network) { - m_network->Publish(Handle{publisher->handle}.GetIndex(), topic->name, - topic->typeStr, topic->properties, config); + m_network->ClientPublish(Handle{publisher->handle}.GetIndex(), topic->name, + topic->typeStr, topic->properties, config); } return publisher; } @@ -580,7 +582,7 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) { } if (publisher->active && m_network) { - m_network->Unpublish(Handle{publisher->handle}.GetIndex()); + m_network->ClientUnpublish(Handle{publisher->handle}.GetIndex()); } if (publisher->active && !topic->localPublishers.empty()) { @@ -593,9 +595,9 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) { topic->typeStr = nextPub->config.typeStr; RefreshPubSubActive(topic, false); if (nextPub->active && m_network) { - m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name, - topic->typeStr, topic->properties, - nextPub->config); + m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(), + topic->name, topic->typeStr, + topic->properties, nextPub->config); } } } @@ -619,8 +621,8 @@ LocalStorage::SubscriberData* LocalStorage::Impl::AddLocalSubscriber( } if (m_network && !subscriber->config.hidden) { DEBUG4("-> NetworkSubscribe({})", topic->name); - m_network->Subscribe(Handle{subscriber->handle}.GetIndex(), {{topic->name}}, - config); + m_network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(), + {{topic->name}}, config); } // queue current value @@ -648,7 +650,7 @@ LocalStorage::Impl::RemoveLocalSubscriber(NT_Subscriber subHandle) { } } if (m_network && !subscriber->config.hidden) { - m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex()); + m_network->ClientUnsubscribe(Handle{subscriber->handle}.GetIndex()); } } return subscriber; @@ -685,8 +687,8 @@ LocalStorage::MultiSubscriberData* LocalStorage::Impl::AddMultiSubscriber( } if (m_network && !subscriber->options.hidden) { DEBUG4("-> NetworkSubscribe"); - m_network->Subscribe(Handle{subscriber->handle}.GetIndex(), - subscriber->prefixes, subscriber->options); + m_network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(), + subscriber->prefixes, subscriber->options); } return subscriber; } @@ -704,7 +706,7 @@ LocalStorage::Impl::RemoveMultiSubscriber(NT_MultiSubscriber subHandle) { } } if (m_network && !subscriber->options.hidden) { - m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex()); + m_network->ClientUnsubscribe(Handle{subscriber->handle}.GetIndex()); } } return subscriber; @@ -978,7 +980,7 @@ bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher, if (publisher->topic->IsCached()) { publisher->topic->lastValueNetwork = value; } - m_network->SetValue(Handle{publisher->handle}.GetIndex(), value); + m_network->ClientSetValue(Handle{publisher->handle}.GetIndex(), value); } return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, suppressDuplicates, publisher); @@ -1074,24 +1076,24 @@ LocalStorage::Impl::Impl(int inst, IListenerStorage& listenerStorage, LocalStorage::~LocalStorage() = default; -NT_Topic LocalStorage::NetworkAnnounce(std::string_view name, - std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid) { +int LocalStorage::ServerAnnounce(std::string_view name, int id, + std::string_view typeStr, + const wpi::json& properties, + std::optional pubuid) { std::scoped_lock lock{m_mutex}; auto topic = m_impl.GetOrCreateTopic(name); m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid); - return topic->handle; + return Handle{topic->handle}.GetIndex(); } -void LocalStorage::NetworkUnannounce(std::string_view name) { +void LocalStorage::ServerUnannounce(std::string_view name, int id) { std::scoped_lock lock{m_mutex}; auto topic = m_impl.GetOrCreateTopic(name); m_impl.RemoveNetworkPublisher(topic); } -void LocalStorage::NetworkPropertiesUpdate(std::string_view name, - const wpi::json& update, bool ack) { +void LocalStorage::ServerPropertiesUpdate(std::string_view name, + const wpi::json& update, bool ack) { std::scoped_lock lock{m_mutex}; auto it = m_impl.m_nameTopics.find(name); if (it != m_impl.m_nameTopics.end()) { @@ -1099,9 +1101,10 @@ void LocalStorage::NetworkPropertiesUpdate(std::string_view name, } } -void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) { +void LocalStorage::ServerSetValue(int topicId, const Value& value) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = + m_impl.m_topics.Get(Handle{m_impl.m_inst, topicId, Handle::kTopic})) { if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) { if (topic->IsCached()) { topic->lastValueNetwork = value; @@ -1111,12 +1114,12 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) { } } -void LocalStorage::StartNetwork(net::NetworkInterface* network) { +void LocalStorage::StartNetwork(net::ClientMessageHandler* network) { std::scoped_lock lock{m_mutex}; m_impl.StartNetwork(network); } -void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) { +void LocalStorage::Impl::StartNetwork(net::ClientMessageHandler* network) { DEBUG4("StartNetwork()"); m_network = network; // publish all active publishers to the network and send last values @@ -1125,26 +1128,27 @@ void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) { PublisherData* anyPublisher = nullptr; for (auto&& publisher : topic->localPublishers) { if (publisher->active) { - network->Publish(Handle{publisher->handle}.GetIndex(), topic->name, - topic->typeStr, topic->properties, publisher->config); + network->ClientPublish(Handle{publisher->handle}.GetIndex(), + topic->name, topic->typeStr, topic->properties, + publisher->config); anyPublisher = publisher; } } if (anyPublisher && topic->lastValue) { - network->SetValue(Handle{anyPublisher->handle}.GetIndex(), - topic->lastValue); + network->ClientSetValue(Handle{anyPublisher->handle}.GetIndex(), + topic->lastValue); } } for (auto&& subscriber : m_subscribers) { if (!subscriber->config.hidden) { - network->Subscribe(Handle{subscriber->handle}.GetIndex(), - {{subscriber->topic->name}}, subscriber->config); + network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(), + {{subscriber->topic->name}}, subscriber->config); } } for (auto&& subscriber : m_multiSubscribers) { if (!subscriber->options.hidden) { - network->Subscribe(Handle{subscriber->handle}.GetIndex(), - subscriber->prefixes, subscriber->options); + network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(), + subscriber->prefixes, subscriber->options); } } } diff --git a/ntcore/src/main/native/cpp/LocalStorage.h b/ntcore/src/main/native/cpp/LocalStorage.h index 9fd2d837a5..f4e15d404c 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.h +++ b/ntcore/src/main/native/cpp/LocalStorage.h @@ -6,7 +6,6 @@ #include -#include #include #include #include @@ -26,6 +25,7 @@ #include "Types_internal.h" #include "ValueCircularBuffer.h" #include "VectorSet.h" +#include "net/MessageHandler.h" #include "net/NetworkInterface.h" #include "ntcore_cpp.h" @@ -46,15 +46,15 @@ class LocalStorage final : public net::ILocalStorage { ~LocalStorage() final; // network interface functions - NT_Topic NetworkAnnounce(std::string_view name, std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid) final; - void NetworkUnannounce(std::string_view name) final; - void NetworkPropertiesUpdate(std::string_view name, const wpi::json& update, - bool ack) final; - void NetworkSetValue(NT_Topic topicHandle, const Value& value) final; + int ServerAnnounce(std::string_view name, int id, std::string_view typeStr, + const wpi::json& properties, + std::optional pubuid) final; + void ServerUnannounce(std::string_view name, int id) final; + void ServerPropertiesUpdate(std::string_view name, const wpi::json& update, + bool ack) final; + void ServerSetValue(int topicId, const Value& value) final; - void StartNetwork(net::NetworkInterface* network) final; + void StartNetwork(net::ClientMessageHandler* network) final; void ClearNetwork() final; // User functions. These are the actual implementations of the corresponding @@ -555,7 +555,7 @@ class LocalStorage final : public net::ILocalStorage { int m_inst; IListenerStorage& m_listenerStorage; wpi::Logger& m_logger; - net::NetworkInterface* m_network{nullptr}; + net::ClientMessageHandler* m_network{nullptr}; // handle mappings HandleMap m_topics; @@ -606,7 +606,7 @@ class LocalStorage final : public net::ILocalStorage { void RemoveNetworkPublisher(TopicData* topic); void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update, bool ack); - void StartNetwork(net::NetworkInterface* network); + void StartNetwork(net::ClientMessageHandler* network); PublisherData* AddLocalPublisher(TopicData* topic, const wpi::json& properties, diff --git a/ntcore/src/main/native/cpp/NetworkClient.cpp b/ntcore/src/main/native/cpp/NetworkClient.cpp index d6f987441d..348ed2b391 100644 --- a/ntcore/src/main/native/cpp/NetworkClient.cpp +++ b/ntcore/src/main/native/cpp/NetworkClient.cpp @@ -22,6 +22,7 @@ #include "IConnectionList.h" #include "Log.h" +#include "net/NetworkInterface.h" using namespace nt; namespace uv = wpi::uv; diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.cpp b/ntcore/src/main/native/cpp/net/ClientImpl.cpp index 42be55b3d4..ff5408e722 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ClientImpl.cpp @@ -94,17 +94,8 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs, continue; } - // otherwise it's a value message, get the local topic handle for it - auto topicIt = m_topicMap.find(id); - if (topicIt == m_topicMap.end()) { - WARN("received unknown id {}", id); - continue; - } - - // pass along to local handler - if (m_local) { - m_local->NetworkSetValue(topicIt->second, value); - } + // otherwise it's a value message + ServerSetValue(id, value); } } @@ -229,19 +220,21 @@ void ClientImpl::SetValue(int32_t pubuid, const Value& value) { publisher.options.sendAll ? ValueSendMode::kAll : ValueSendMode::kNormal); } -void ClientImpl::ServerAnnounce(std::string_view name, int id, - std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid) { +int ClientImpl::ServerAnnounce(std::string_view name, int id, + std::string_view typeStr, + const wpi::json& properties, + std::optional pubuid) { DEBUG4("ServerAnnounce({}, {}, {})", name, id, typeStr); assert(m_local); - m_topicMap[id] = m_local->NetworkAnnounce(name, typeStr, properties, pubuid); + m_topicMap[id] = + m_local->ServerAnnounce(name, 0, typeStr, properties, pubuid); + return id; } void ClientImpl::ServerUnannounce(std::string_view name, int id) { DEBUG4("ServerUnannounce({}, {})", name, id); assert(m_local); - m_local->NetworkUnannounce(name); + m_local->ServerUnannounce(name, m_topicMap[id]); m_topicMap.erase(id); } @@ -249,7 +242,21 @@ void ClientImpl::ServerPropertiesUpdate(std::string_view name, const wpi::json& update, bool ack) { DEBUG4("ServerProperties({}, {}, {})", name, update.dump(), ack); assert(m_local); - m_local->NetworkPropertiesUpdate(name, update, ack); + m_local->ServerPropertiesUpdate(name, update, ack); +} + +void ClientImpl::ServerSetValue(int topicId, const Value& value) { + // get the local topic handle for it + auto topicIt = m_topicMap.find(topicId); + if (topicIt == m_topicMap.end()) { + WARN("received unknown id {}", topicId); + return; + } + + // pass along to local handler + if (m_local) { + m_local->ServerSetValue(topicIt->second, value); + } } void ClientImpl::ProcessIncomingText(std::string_view data) { diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.h b/ntcore/src/main/native/cpp/net/ClientImpl.h index fe32510ba9..95a5b7e68f 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.h +++ b/ntcore/src/main/native/cpp/net/ClientImpl.h @@ -14,7 +14,7 @@ #include -#include "NetworkInterface.h" +#include "MessageHandler.h" #include "NetworkOutgoingQueue.h" #include "NetworkPing.h" #include "PubSubOptions.h" @@ -49,7 +49,7 @@ class ClientImpl final : private ServerMessageHandler { void SendOutgoing(uint64_t curTimeMs, bool flush); - void SetLocal(LocalInterface* local) { m_local = local; } + void SetLocal(ServerMessageHandler* local) { m_local = local; } void SendInitial(); private: @@ -63,12 +63,13 @@ class ClientImpl final : private ServerMessageHandler { void UpdatePeriodic(); // ServerMessageHandler interface - void ServerAnnounce(std::string_view name, int id, std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid) final; + int ServerAnnounce(std::string_view name, int id, std::string_view typeStr, + const wpi::json& properties, + std::optional pubuid) final; void ServerUnannounce(std::string_view name, int id) final; void ServerPropertiesUpdate(std::string_view name, const wpi::json& update, bool ack) final; + void ServerSetValue(int topicId, const Value& value) final; void Publish(int pubuid, std::string_view name, std::string_view typeStr, const wpi::json& properties, const PubSubOptionsImpl& options); @@ -77,7 +78,7 @@ class ClientImpl final : private ServerMessageHandler { WireConnection& m_wire; wpi::Logger& m_logger; - LocalInterface* m_local{nullptr}; + ServerMessageHandler* m_local{nullptr}; std::function m_timeSyncUpdated; std::function m_setPeriodic; @@ -86,7 +87,7 @@ class ClientImpl final : private ServerMessageHandler { std::vector> m_publishers; // indexed by server-provided topic id - wpi::DenseMap m_topicMap; + wpi::DenseMap m_topicMap; // ping NetworkPing m_ping; diff --git a/ntcore/src/main/native/cpp/net/MessageHandler.h b/ntcore/src/main/native/cpp/net/MessageHandler.h new file mode 100644 index 0000000000..6f09e4ae49 --- /dev/null +++ b/ntcore/src/main/native/cpp/net/MessageHandler.h @@ -0,0 +1,52 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include +#include +#include + +#include + +namespace nt { +class PubSubOptionsImpl; +class Value; +} // namespace nt + +namespace nt::net { + +class ClientMessageHandler { + public: + virtual ~ClientMessageHandler() = default; + + virtual void ClientPublish(int pubuid, std::string_view name, + std::string_view typeStr, + const wpi::json& properties, + const PubSubOptionsImpl& options) = 0; + virtual void ClientUnpublish(int pubuid) = 0; + virtual void ClientSetProperties(std::string_view name, + const wpi::json& update) = 0; + virtual void ClientSubscribe(int subuid, + std::span topicNames, + const PubSubOptionsImpl& options) = 0; + virtual void ClientUnsubscribe(int subuid) = 0; + virtual void ClientSetValue(int pubuid, const Value& value) = 0; +}; + +class ServerMessageHandler { + public: + virtual ~ServerMessageHandler() = default; + virtual int ServerAnnounce(std::string_view name, int id, + std::string_view typeStr, + const wpi::json& properties, + std::optional pubuid) = 0; + virtual void ServerUnannounce(std::string_view name, int id) = 0; + virtual void ServerPropertiesUpdate(std::string_view name, + const wpi::json& update, bool ack) = 0; + virtual void ServerSetValue(int topicuid, const Value& value) = 0; +}; + +} // namespace nt::net diff --git a/ntcore/src/main/native/cpp/net/NetworkInterface.h b/ntcore/src/main/native/cpp/net/NetworkInterface.h index e578aa4238..20667952d1 100644 --- a/ntcore/src/main/native/cpp/net/NetworkInterface.h +++ b/ntcore/src/main/native/cpp/net/NetworkInterface.h @@ -4,56 +4,13 @@ #pragma once -#include - -#include -#include -#include - -#include - -#include "ntcore_cpp.h" - -namespace nt { -class PubSubOptionsImpl; -class Value; -} // namespace nt +#include "MessageHandler.h" namespace nt::net { -class LocalInterface { +class ILocalStorage : public ServerMessageHandler { public: - virtual ~LocalInterface() = default; - - virtual NT_Topic NetworkAnnounce(std::string_view name, - std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid) = 0; - virtual void NetworkUnannounce(std::string_view name) = 0; - virtual void NetworkPropertiesUpdate(std::string_view name, - const wpi::json& update, bool ack) = 0; - virtual void NetworkSetValue(NT_Topic topicHandle, const Value& value) = 0; -}; - -class NetworkInterface { - public: - virtual ~NetworkInterface() = default; - - virtual void Publish(int pubuid, std::string_view name, - std::string_view typeStr, const wpi::json& properties, - const PubSubOptionsImpl& options) = 0; - virtual void Unpublish(int pubuid) = 0; - virtual void SetProperties(std::string_view name, - const wpi::json& update) = 0; - virtual void Subscribe(int subuid, std::span topicNames, - const PubSubOptionsImpl& options) = 0; - virtual void Unsubscribe(int subuid) = 0; - virtual void SetValue(int pubuid, const Value& value) = 0; -}; - -class ILocalStorage : public LocalInterface { - public: - virtual void StartNetwork(NetworkInterface* network) = 0; + virtual void StartNetwork(ClientMessageHandler* network) = 0; virtual void ClearNetwork() = 0; }; diff --git a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.cpp b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.cpp index a4d584a0d3..cb7f05e09f 100644 --- a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.cpp +++ b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.cpp @@ -10,7 +10,7 @@ using namespace nt::net; static constexpr size_t kMaxSize = 2 * 1024 * 1024; -void NetworkLoopQueue::SetValue(int pubuid, const Value& value) { +void NetworkLoopQueue::ClientSetValue(int pubuid, const Value& value) { std::scoped_lock lock{m_mutex}; m_size += sizeof(ClientMessage) + value.size(); if (m_size > kMaxSize) { diff --git a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.h b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.h index 8ece5e7c1d..ca55ed96b4 100644 --- a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.h +++ b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.h @@ -11,7 +11,7 @@ #include #include "Message.h" -#include "NetworkInterface.h" +#include "MessageHandler.h" namespace wpi { class Logger; @@ -19,7 +19,7 @@ class Logger; namespace nt::net { -class NetworkLoopQueue : public NetworkInterface { +class NetworkLoopQueue : public ClientMessageHandler { public: static constexpr size_t kInitialQueueSize = 2000; @@ -30,16 +30,17 @@ class NetworkLoopQueue : public NetworkInterface { void ReadQueue(std::vector* out); void ClearQueue(); - // NetworkInterface - calls to these append to the queue - void Publish(int pubuid, std::string_view name, std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) final; - void Unpublish(int pubuid) final; - void SetProperties(std::string_view name, const wpi::json& update) final; - void Subscribe(int subuid, std::span topicNames, - const PubSubOptionsImpl& options) final; - void Unsubscribe(int subuid) final; - void SetValue(int pubuid, const Value& value) final; + // ClientMessageHandler - calls to these append to the queue + void ClientPublish(int pubuid, std::string_view name, + std::string_view typeStr, const wpi::json& properties, + const PubSubOptionsImpl& options) final; + void ClientUnpublish(int pubuid) final; + void ClientSetProperties(std::string_view name, + const wpi::json& update) final; + void ClientSubscribe(int subuid, std::span topicNames, + const PubSubOptionsImpl& options) final; + void ClientUnsubscribe(int subuid) final; + void ClientSetValue(int pubuid, const Value& value) final; private: wpi::mutex m_mutex; diff --git a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.inc b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.inc index f2d76fe4d0..1143e23d25 100644 --- a/ntcore/src/main/native/cpp/net/NetworkLoopQueue.inc +++ b/ntcore/src/main/native/cpp/net/NetworkLoopQueue.inc @@ -9,7 +9,6 @@ #include #include "NetworkLoopQueue.h" -#include "ntcore_c.h" namespace nt::net { @@ -29,36 +28,36 @@ inline void NetworkLoopQueue::ClearQueue() { m_sizeErrored = false; } -inline void NetworkLoopQueue::Publish(int pubuid, std::string_view name, - std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) { +inline void NetworkLoopQueue::ClientPublish(int pubuid, std::string_view name, + std::string_view typeStr, + const wpi::json& properties, + const PubSubOptionsImpl& options) { std::scoped_lock lock{m_mutex}; m_queue.emplace_back(ClientMessage{PublishMsg{ pubuid, std::string{name}, std::string{typeStr}, properties, options}}); } -inline void NetworkLoopQueue::Unpublish(int pubuid) { +inline void NetworkLoopQueue::ClientUnpublish(int pubuid) { std::scoped_lock lock{m_mutex}; m_queue.emplace_back(ClientMessage{UnpublishMsg{pubuid}}); } -inline void NetworkLoopQueue::SetProperties(std::string_view name, - const wpi::json& update) { +inline void NetworkLoopQueue::ClientSetProperties(std::string_view name, + const wpi::json& update) { std::scoped_lock lock{m_mutex}; m_queue.emplace_back( ClientMessage{SetPropertiesMsg{std::string{name}, update}}); } -inline void NetworkLoopQueue::Subscribe(int subuid, - std::span topicNames, - const PubSubOptionsImpl& options) { +inline void NetworkLoopQueue::ClientSubscribe( + int subuid, std::span topicNames, + const PubSubOptionsImpl& options) { std::scoped_lock lock{m_mutex}; m_queue.emplace_back(ClientMessage{ SubscribeMsg{subuid, {topicNames.begin(), topicNames.end()}, options}}); } -inline void NetworkLoopQueue::Unsubscribe(int subuid) { +inline void NetworkLoopQueue::ClientUnsubscribe(int subuid) { std::scoped_lock lock{m_mutex}; m_queue.emplace_back(ClientMessage{UnsubscribeMsg{subuid}}); } diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.cpp b/ntcore/src/main/native/cpp/net/ServerImpl.cpp index 78884b4e01..e24842d0ba 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ServerImpl.cpp @@ -199,10 +199,9 @@ std::span ServerImpl::ClientData::GetSubscribers( return {buf.data(), buf.size()}; } -void ServerImpl::ClientData4Base::ClientPublish(int pubuid, - std::string_view name, - std::string_view typeStr, - const wpi::json& properties) { +void ServerImpl::ClientData4Base::ClientPublish( + int pubuid, std::string_view name, std::string_view typeStr, + const wpi::json& properties, const PubSubOptionsImpl& options) { DEBUG3("ClientPublish({}, {}, {}, {})", m_id, name, pubuid, typeStr); auto topic = m_server.CreateTopic(this, name, typeStr, properties); @@ -393,7 +392,7 @@ void ServerImpl::ClientDataLocal::SendValue(TopicData* topic, const Value& value, ValueSendMode mode) { if (m_server.m_local) { - m_server.m_local->NetworkSetValue(topic->localHandle, value); + m_server.m_local->ServerSetValue(topic->localTopic, value); } } @@ -406,8 +405,8 @@ void ServerImpl::ClientDataLocal::SendAnnounce(TopicData* topic, } sent = true; - topic->localHandle = m_server.m_local->NetworkAnnounce( - topic->name, topic->typeStr, topic->properties, pubuid); + topic->localTopic = m_server.m_local->ServerAnnounce( + topic->name, 0, topic->typeStr, topic->properties, pubuid); } } @@ -418,7 +417,7 @@ void ServerImpl::ClientDataLocal::SendUnannounce(TopicData* topic) { return; } sent = false; - m_server.m_local->NetworkUnannounce(topic->name); + m_server.m_local->ServerUnannounce(topic->name, topic->localTopic); } } @@ -429,7 +428,7 @@ void ServerImpl::ClientDataLocal::SendPropertiesUpdate(TopicData* topic, if (!m_announceSent.lookup(topic)) { return; } - m_server.m_local->NetworkPropertiesUpdate(topic->name, update, ack); + m_server.m_local->ServerPropertiesUpdate(topic->name, update, ack); } } @@ -447,7 +446,8 @@ void ServerImpl::ClientDataLocal::HandleLocal( if (auto msg = std::get_if(&elem.contents)) { ClientSetValue(msg->pubuid, msg->value); } else if (auto msg = std::get_if(&elem.contents)) { - ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties); + ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties, + msg->options); updatepub = true; } else if (auto msg = std::get_if(&elem.contents)) { ClientUnpublish(msg->pubuid); @@ -1923,7 +1923,7 @@ void ServerImpl::HandleLocal(std::span msgs) { m_localClient->HandleLocal(msgs); } -void ServerImpl::SetLocal(LocalInterface* local) { +void ServerImpl::SetLocal(ServerMessageHandler* local) { DEBUG4("SetLocal()"); m_local = local; diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.h b/ntcore/src/main/native/cpp/net/ServerImpl.h index da78cbc079..01e0058ec7 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.h +++ b/ntcore/src/main/native/cpp/net/ServerImpl.h @@ -64,7 +64,7 @@ class ServerImpl final { void SendOutgoing(int clientId, uint64_t curTimeMs); void HandleLocal(std::span msgs); - void SetLocal(LocalInterface* local); + void SetLocal(ServerMessageHandler* local); void ProcessIncomingText(int clientId, std::string_view data); void ProcessIncomingBinary(int clientId, std::span data); @@ -129,7 +129,7 @@ class ServerImpl final { bool retained{false}; bool cached{true}; bool special{false}; - NT_Topic localHandle{0}; + int localTopic{0}; void AddPublisher(ClientData* client, PublisherData* pub) { if (clients[client].publishers.insert(pub).second) { @@ -237,8 +237,8 @@ class ServerImpl final { protected: // ClientMessageHandler interface void ClientPublish(int pubuid, std::string_view name, - std::string_view typeStr, - const wpi::json& properties) final; + std::string_view typeStr, const wpi::json& properties, + const PubSubOptionsImpl& options) final; void ClientUnpublish(int pubuid) final; void ClientSetProperties(std::string_view name, const wpi::json& update) final; @@ -246,7 +246,7 @@ class ServerImpl final { const PubSubOptionsImpl& options) final; void ClientUnsubscribe(int subuid) final; - void ClientSetValue(int pubuid, const Value& value); + void ClientSetValue(int pubuid, const Value& value) final; wpi::DenseMap m_announceSent; }; @@ -437,7 +437,7 @@ class ServerImpl final { }; wpi::Logger& m_logger; - LocalInterface* m_local{nullptr}; + ServerMessageHandler* m_local{nullptr}; bool m_controlReady{false}; ClientDataLocal* m_localClient; diff --git a/ntcore/src/main/native/cpp/net/WireDecoder.cpp b/ntcore/src/main/native/cpp/net/WireDecoder.cpp index 5ea44374a0..14a051d479 100644 --- a/ntcore/src/main/native/cpp/net/WireDecoder.cpp +++ b/ntcore/src/main/native/cpp/net/WireDecoder.cpp @@ -17,6 +17,7 @@ #include #include "Message.h" +#include "MessageHandler.h" using namespace nt; using namespace nt::net; @@ -190,7 +191,7 @@ static bool WireDecodeTextImpl(std::string_view in, T& out, } // complete - out.ClientPublish(pubuid, *name, *typeStr, *properties); + out.ClientPublish(pubuid, *name, *typeStr, *properties, {}); rv = true; } else if (*method == UnpublishMsg::kMethodStr) { // pubuid diff --git a/ntcore/src/main/native/cpp/net/WireDecoder.h b/ntcore/src/main/native/cpp/net/WireDecoder.h index ae1f814eb8..3a8d421608 100644 --- a/ntcore/src/main/native/cpp/net/WireDecoder.h +++ b/ntcore/src/main/native/cpp/net/WireDecoder.h @@ -6,51 +6,22 @@ #include -#include #include #include #include -#include - namespace wpi { class Logger; } // namespace wpi namespace nt { -class PubSubOptionsImpl; class Value; } // namespace nt namespace nt::net { -class ClientMessageHandler { - public: - virtual ~ClientMessageHandler() = default; - - virtual void ClientPublish(int pubuid, std::string_view name, - std::string_view typeStr, - const wpi::json& properties) = 0; - virtual void ClientUnpublish(int pubuid) = 0; - virtual void ClientSetProperties(std::string_view name, - const wpi::json& update) = 0; - virtual void ClientSubscribe(int subuid, - std::span topicNames, - const PubSubOptionsImpl& options) = 0; - virtual void ClientUnsubscribe(int subuid) = 0; -}; - -class ServerMessageHandler { - public: - virtual ~ServerMessageHandler() = default; - virtual void ServerAnnounce(std::string_view name, int id, - std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid) = 0; - virtual void ServerUnannounce(std::string_view name, int id) = 0; - virtual void ServerPropertiesUpdate(std::string_view name, - const wpi::json& update, bool ack) = 0; -}; +class ClientMessageHandler; +class ServerMessageHandler; // return true if client pub/sub metadata needs updating bool WireDecodeText(std::string_view in, ClientMessageHandler& out, diff --git a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp index 3bdbeefc13..d8813ff6ac 100644 --- a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp +++ b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp @@ -360,13 +360,13 @@ void ClientImpl3::EntryAssign(std::string_view name, unsigned int id, if (m_local) { // XXX: need to handle type change specially? (e.g. with unannounce) - if (entry->topic == 0 || flagsChanged || typeChanged) { + if (!entry->topic || flagsChanged || typeChanged) { DEBUG4("NetworkAnnounce({}, {})", name, entry->typeStr); - entry->topic = m_local->NetworkAnnounce(name, entry->typeStr, - entry->properties, std::nullopt); + entry->topic = m_local->ServerAnnounce(name, 0, entry->typeStr, + entry->properties, std::nullopt); } if (valueChanged) { - m_local->NetworkSetValue(entry->topic, entry->value); + m_local->ServerSetValue(entry->topic.value(), entry->value); } } } @@ -380,8 +380,8 @@ void ClientImpl3::EntryUpdate(unsigned int id, unsigned int seq_num, } if (auto entry = LookupId(id)) { entry->value = value; - if (m_local && entry->topic != 0) { - m_local->NetworkSetValue(entry->topic, entry->value); + if (m_local && entry->topic) { + m_local->ServerSetValue(entry->topic.value(), entry->value); } } } @@ -395,7 +395,7 @@ void ClientImpl3::FlagsUpdate(unsigned int id, unsigned int flags) { if (auto entry = LookupId(id)) { wpi::json update = entry->SetFlags(flags); if (!update.empty() && m_local) { - m_local->NetworkPropertiesUpdate(entry->name, update, false); + m_local->ServerPropertiesUpdate(entry->name, update, false); } } @@ -419,8 +419,8 @@ void ClientImpl3::EntryDelete(unsigned int id) { entry->value = Value{}; // if we have no local publishers, unannounce - if (entry->publishers.empty() && m_local) { - m_local->NetworkUnannounce(entry->name); + if (entry->publishers.empty() && m_local && entry->topic) { + m_local->ServerUnannounce(entry->name, entry->topic.value()); } } @@ -443,8 +443,8 @@ void ClientImpl3::ClearEntries() { entry->value = Value{}; // if we have no local publishers, unannounce - if (entry->publishers.empty() && m_local) { - m_local->NetworkUnannounce(entry->name); + if (entry->publishers.empty() && m_local && entry->topic) { + m_local->ServerUnannounce(entry->name, entry->topic.value()); } entry = nullptr; // clear id mapping diff --git a/ntcore/src/main/native/cpp/net3/ClientImpl3.h b/ntcore/src/main/native/cpp/net3/ClientImpl3.h index 394de64aef..42610342c6 100644 --- a/ntcore/src/main/native/cpp/net3/ClientImpl3.h +++ b/ntcore/src/main/native/cpp/net3/ClientImpl3.h @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -18,7 +19,7 @@ #include #include "PubSubOptions.h" -#include "net/NetworkInterface.h" +#include "net/MessageHandler.h" #include "net3/Message3.h" #include "net3/SequenceNumber.h" #include "net3/WireConnection3.h" @@ -52,7 +53,7 @@ class ClientImpl3 final : private MessageHandler3 { DoSendPeriodic(curTimeMs, false, flush); } - void SetLocal(net::LocalInterface* local) { m_local = local; } + void SetLocal(net::ServerMessageHandler* local) { m_local = local; } private: struct Entry; @@ -93,8 +94,8 @@ class ClientImpl3 final : private MessageHandler3 { // Sequence number for update resolution SequenceNumber seqNum; - // Local topic handle - NT_Topic topic{0}; + // Local topic id + std::optional topic; // Local publishers std::vector publishers; @@ -139,7 +140,7 @@ class ClientImpl3 final : private MessageHandler3 { WireConnection3& m_wire; wpi::Logger& m_logger; - net::LocalInterface* m_local{nullptr}; + net::ServerMessageHandler* m_local{nullptr}; std::function m_setPeriodic; uint64_t m_initTimeMs; diff --git a/ntcore/src/test/native/cpp/LocalStorageTest.cpp b/ntcore/src/test/native/cpp/LocalStorageTest.cpp index 31d2eea32d..e9b673483d 100644 --- a/ntcore/src/test/native/cpp/LocalStorageTest.cpp +++ b/ntcore/src/test/native/cpp/LocalStorageTest.cpp @@ -15,6 +15,7 @@ #include "TestPrinters.h" #include "ValueMatcher.h" #include "gmock/gmock.h" +#include "net/MockMessageHandler.h" #include "net/MockNetworkInterface.h" #include "ntcore_c.h" #include "ntcore_cpp.h" @@ -48,7 +49,7 @@ class LocalStorageTest : public ::testing::Test { public: LocalStorageTest() { storage.StartNetwork(&network); } - ::testing::StrictMock network; + ::testing::StrictMock network; wpi::MockLogger logger; MockListenerStorage listenerStorage; LocalStorage storage{0, listenerStorage, logger}; @@ -79,8 +80,8 @@ TEST_F(LocalStorageTest, GetEntryEmptyName) { } TEST_F(LocalStorageTest, GetEntryCached) { - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"tocache"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"tocache"}}), + IsDefaultPubSubOptions())); auto entry1 = storage.GetEntry("tocache"); EXPECT_EQ(entry1, storage.GetEntry("tocache")); @@ -105,9 +106,10 @@ TEST_F(LocalStorageTest, GetTopicInfoUnpublished) { } TEST_F(LocalStorageTest, DefaultProps) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); storage.Publish(fooTopic, NT_BOOLEAN, "boolean", wpi::json::object(), {}); EXPECT_FALSE(storage.GetTopicPersistent(fooTopic)); @@ -116,9 +118,10 @@ TEST_F(LocalStorageTest, DefaultProps) { } TEST_F(LocalStorageTest, PublishNewNoProps) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); storage.Publish(fooTopic, NT_BOOLEAN, "boolean", wpi::json::object(), {}); auto info = storage.GetTopicInfo(fooTopic); @@ -126,9 +129,10 @@ TEST_F(LocalStorageTest, PublishNewNoProps) { } TEST_F(LocalStorageTest, PublishNewNoPropsNull) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {}); auto info = storage.GetTopicInfo(fooTopic); @@ -137,9 +141,9 @@ TEST_F(LocalStorageTest, PublishNewNoPropsNull) { TEST_F(LocalStorageTest, PublishNew) { wpi::json properties = {{"persistent", true}}; - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - properties, IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientPublish(_, std::string_view{"foo"}, + std::string_view{"boolean"}, properties, + IsDefaultPubSubOptions())); storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {{"persistent", true}}, {}); auto info = storage.GetTopicInfo(fooTopic); @@ -155,17 +159,18 @@ TEST_F(LocalStorageTest, PublishNew) { } TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPost) { - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); auto sub = storage.Subscribe(fooTopic, NT_UNASSIGNED, "", {}); - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); auto pub = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {}); auto val = Value::MakeBoolean(true, 5); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val)); storage.SetEntryValue(pub, val); EXPECT_EQ(storage.GetTopicType(fooTopic), NT_BOOLEAN); @@ -183,7 +188,7 @@ TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPost) { EXPECT_EQ(vals[0].time, 5); val = Value::MakeBoolean(false, 6); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val)); storage.SetEntryValue(pub, val); auto vals2 = storage.ReadQueue(sub); // mismatched type @@ -191,17 +196,18 @@ TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPost) { } TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPre) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); auto pub = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {}); auto val = Value::MakeBoolean(true, 5); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val)); storage.SetEntryValue(pub, val); - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); auto sub = storage.Subscribe(fooTopic, NT_UNASSIGNED, "", {}); EXPECT_EQ(storage.GetTopicType(fooTopic), NT_BOOLEAN); @@ -215,16 +221,17 @@ TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPre) { } TEST_F(LocalStorageTest, EntryNoTypeLocalSet) { - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); auto entry = storage.GetEntry(fooTopic, NT_UNASSIGNED, "", {}); // results in a publish and value set auto val = Value::MakeBoolean(true, 5); - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); - EXPECT_CALL(network, SetValue(_, val)); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSetValue(_, val)); EXPECT_TRUE(storage.SetEntryValue(entry, val)); EXPECT_EQ(storage.GetTopicType(fooTopic), NT_BOOLEAN); @@ -243,7 +250,7 @@ TEST_F(LocalStorageTest, EntryNoTypeLocalSet) { // normal set with same type val = Value::MakeBoolean(false, 6); - EXPECT_CALL(network, SetValue(_, val)); + EXPECT_CALL(network, ClientSetValue(_, val)); EXPECT_TRUE(storage.SetEntryValue(entry, val)); auto vals2 = storage.ReadQueue(entry); // mismatched type @@ -261,13 +268,14 @@ TEST_F(LocalStorageTest, EntryNoTypeLocalSet) { } TEST_F(LocalStorageTest, PubUnpubPub) { - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); auto sub = storage.Subscribe(fooTopic, NT_INTEGER, "int", {}); - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); EXPECT_CALL(logger, Call(NT_LOG_INFO, _, _, std::string_view{ @@ -276,7 +284,7 @@ TEST_F(LocalStorageTest, PubUnpubPub) { auto pub = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {}); auto val = Value::MakeBoolean(true, 5); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val)); EXPECT_TRUE(storage.SetEntryValue(pub, val)); EXPECT_EQ(storage.GetTopicType(fooTopic), NT_BOOLEAN); @@ -285,7 +293,7 @@ TEST_F(LocalStorageTest, PubUnpubPub) { EXPECT_TRUE(storage.ReadQueue(sub).empty()); - EXPECT_CALL(network, Unpublish(Handle{pub}.GetIndex())); + EXPECT_CALL(network, ClientUnpublish(Handle{pub}.GetIndex())); storage.Unpublish(pub); EXPECT_EQ(storage.GetTopicType(fooTopic), NT_UNASSIGNED); @@ -293,12 +301,12 @@ TEST_F(LocalStorageTest, PubUnpubPub) { EXPECT_FALSE(storage.GetTopicExists(fooTopic)); EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"int"}, - wpi::json::object(), IsDefaultPubSubOptions())); + ClientPublish(_, std::string_view{"foo"}, std::string_view{"int"}, + wpi::json::object(), IsDefaultPubSubOptions())); pub = storage.Publish(fooTopic, NT_INTEGER, "int", {}, {}); val = Value::MakeInteger(3, 5); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val)); EXPECT_TRUE(storage.SetEntryValue(pub, val)); EXPECT_EQ(storage.GetTopicType(fooTopic), NT_INTEGER); @@ -309,9 +317,10 @@ TEST_F(LocalStorageTest, PubUnpubPub) { } TEST_F(LocalStorageTest, LocalPubConflict) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); auto pub1 = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {}); EXPECT_CALL( @@ -325,36 +334,37 @@ TEST_F(LocalStorageTest, LocalPubConflict) { EXPECT_EQ(storage.GetTopicTypeString(fooTopic), "boolean"); EXPECT_TRUE(storage.GetTopicExists(fooTopic)); - EXPECT_CALL(network, SetValue(Handle{pub1}.GetIndex(), _)); + EXPECT_CALL(network, ClientSetValue(Handle{pub1}.GetIndex(), _)); EXPECT_TRUE(storage.SetEntryValue(pub1, Value::MakeBoolean(true, 5))); EXPECT_FALSE(storage.SetEntryValue(pub2, Value::MakeInteger(3, 5))); // unpublishing pub1 will publish pub2 to the network - EXPECT_CALL(network, Unpublish(Handle{pub1}.GetIndex())); + EXPECT_CALL(network, ClientUnpublish(Handle{pub1}.GetIndex())); EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"int"}, - wpi::json::object(), IsDefaultPubSubOptions())); + ClientPublish(_, std::string_view{"foo"}, std::string_view{"int"}, + wpi::json::object(), IsDefaultPubSubOptions())); storage.Unpublish(pub1); EXPECT_EQ(storage.GetTopicType(fooTopic), NT_INTEGER); EXPECT_EQ(storage.GetTopicTypeString(fooTopic), "int"); EXPECT_TRUE(storage.GetTopicExists(fooTopic)); - EXPECT_CALL(network, SetValue(Handle{pub2}.GetIndex(), _)); + EXPECT_CALL(network, ClientSetValue(Handle{pub2}.GetIndex(), _)); EXPECT_FALSE(storage.SetEntryValue(pub1, Value::MakeBoolean(true, 5))); EXPECT_TRUE(storage.SetEntryValue(pub2, Value::MakeInteger(3, 5))); } TEST_F(LocalStorageTest, LocalSubConflict) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {}); - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); EXPECT_CALL(logger, Call(NT_LOG_INFO, _, _, std::string_view{ @@ -364,9 +374,10 @@ TEST_F(LocalStorageTest, LocalSubConflict) { } TEST_F(LocalStorageTest, RemotePubConflict) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {}); @@ -376,18 +387,20 @@ TEST_F(LocalStorageTest, RemotePubConflict) { "network announce of 'foo' overriding local publish " "(was 'boolean', now 'int')"})); - storage.NetworkAnnounce("foo", "int", wpi::json::object(), std::nullopt); + auto id = storage.ServerAnnounce("foo", 0, "int", wpi::json::object(), + std::nullopt); // network overrides local EXPECT_EQ(storage.GetTopicType(fooTopic), NT_INTEGER); EXPECT_EQ(storage.GetTopicTypeString(fooTopic), "int"); EXPECT_TRUE(storage.GetTopicExists(fooTopic)); - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); - storage.NetworkUnannounce("foo"); + storage.ServerUnannounce("foo", id); EXPECT_EQ(storage.GetTopicType(fooTopic), NT_BOOLEAN); EXPECT_EQ(storage.GetTopicTypeString(fooTopic), "boolean"); @@ -396,15 +409,15 @@ TEST_F(LocalStorageTest, RemotePubConflict) { TEST_F(LocalStorageTest, SubNonExist) { // makes sure no warning is emitted - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); storage.Subscribe(fooTopic, NT_BOOLEAN, "boolean", {}); } TEST_F(LocalStorageTest, SetDefaultSubscribe) { // no publish, no value on wire, this is just handled locally - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); auto sub = storage.Subscribe(fooTopic, NT_BOOLEAN, "boolean", {}); EXPECT_TRUE(storage.SetDefaultEntryValue(sub, Value::MakeBoolean(true))); auto val = storage.GetEntryValue(sub); @@ -414,17 +427,18 @@ TEST_F(LocalStorageTest, SetDefaultSubscribe) { } TEST_F(LocalStorageTest, SetDefaultPublish) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); auto pub = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {}); // expect a value across the wire auto expectVal = Value::MakeBoolean(true, 0); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), expectVal)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), expectVal)); EXPECT_TRUE(storage.SetDefaultEntryValue(pub, Value::MakeBoolean(true))); - EXPECT_CALL(network, Subscribe(_, _, IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, _, IsDefaultPubSubOptions())); auto sub = storage.Subscribe(fooTopic, NT_BOOLEAN, "boolean", {}); auto val = storage.GetEntryValue(sub); ASSERT_TRUE(val.IsBoolean()); @@ -433,16 +447,17 @@ TEST_F(LocalStorageTest, SetDefaultPublish) { } TEST_F(LocalStorageTest, SetDefaultEntry) { - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); auto entry = storage.GetEntry(fooTopic, NT_BOOLEAN, "boolean", {}); // expect a publish and value - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); auto expectVal = Value::MakeBoolean(true, 0); - EXPECT_CALL(network, SetValue(_, expectVal)); + EXPECT_CALL(network, ClientSetValue(_, expectVal)); EXPECT_TRUE(storage.SetDefaultEntryValue(entry, Value::MakeBoolean(true))); auto val = storage.GetEntryValue(entry); @@ -452,16 +467,17 @@ TEST_F(LocalStorageTest, SetDefaultEntry) { } TEST_F(LocalStorageTest, SetDefaultEntryUnassigned) { - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); auto entry = storage.GetEntry(fooTopic, NT_UNASSIGNED, "", {}); // expect a publish and value - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"boolean"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"boolean"}, + wpi::json::object(), IsDefaultPubSubOptions())); auto expectVal = Value::MakeBoolean(true, 0); - EXPECT_CALL(network, SetValue(_, expectVal)); + EXPECT_CALL(network, ClientSetValue(_, expectVal)); EXPECT_TRUE(storage.SetDefaultEntryValue(entry, Value::MakeBoolean(true))); ASSERT_EQ(storage.GetTopicType(fooTopic), NT_BOOLEAN); @@ -472,9 +488,10 @@ TEST_F(LocalStorageTest, SetDefaultEntryUnassigned) { } TEST_F(LocalStorageTest, SetDefaultEntryDiffType) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"string"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"string"}, + wpi::json::object(), IsDefaultPubSubOptions())); auto pub = storage.Publish(fooTopic, NT_STRING, "string", {}, {}); EXPECT_FALSE(storage.SetDefaultEntryValue(pub, Value::MakeBoolean(true))); @@ -482,17 +499,18 @@ TEST_F(LocalStorageTest, SetDefaultEntryDiffType) { } TEST_F(LocalStorageTest, SetValueEmptyValue) { - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"string"}, - wpi::json::object(), IsDefaultPubSubOptions())); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"string"}, + wpi::json::object(), IsDefaultPubSubOptions())); auto pub = storage.Publish(fooTopic, NT_STRING, "string", {}, {}); EXPECT_FALSE(storage.SetEntryValue(pub, {})); } TEST_F(LocalStorageTest, SetValueEmptyUntypedEntry) { - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsDefaultPubSubOptions())); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsDefaultPubSubOptions())); auto entry = storage.GetEntry(fooTopic, NT_UNASSIGNED, "", {}); EXPECT_FALSE(storage.SetEntryValue(entry, {})); } @@ -525,17 +543,18 @@ class LocalStorageDuplicatesTest : public LocalStorageTest { void LocalStorageDuplicatesTest::SetupPubSub(bool keepPub, bool keepSub) { PubSubOptionsImpl pubOptions; pubOptions.keepDuplicates = keepPub; - EXPECT_CALL(network, - Publish(_, std::string_view{"foo"}, std::string_view{"double"}, - wpi::json::object(), IsPubSubOptions(pubOptions))); + EXPECT_CALL( + network, + ClientPublish(_, std::string_view{"foo"}, std::string_view{"double"}, + wpi::json::object(), IsPubSubOptions(pubOptions))); pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {.keepDuplicates = keepPub}); PubSubOptionsImpl subOptions; subOptions.pollStorage = 10; subOptions.keepDuplicates = keepSub; - EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), - IsPubSubOptions(subOptions))); + EXPECT_CALL(network, ClientSubscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsPubSubOptions(subOptions))); sub = storage.Subscribe(fooTopic, NT_DOUBLE, "double", {.pollStorage = 10, .keepDuplicates = keepSub}); } @@ -552,8 +571,8 @@ void LocalStorageDuplicatesTest::SetValues(bool expectDuplicates) { TEST_F(LocalStorageDuplicatesTest, Defaults) { SetupPubSub(false, false); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val1)); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val3)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val1)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val3)); SetValues(false); // verify 2nd update was dropped locally @@ -568,9 +587,9 @@ TEST_F(LocalStorageDuplicatesTest, Defaults) { TEST_F(LocalStorageDuplicatesTest, KeepPub) { SetupPubSub(true, false); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val1)).Times(2); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val1)).Times(2); // EXPECT_CALL(network, SetValue(pub, val2)); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val3)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val3)); SetValues(true); // verify only 2 updates were received locally @@ -582,8 +601,8 @@ TEST_F(LocalStorageDuplicatesTest, KeepSub) { SetupPubSub(false, true); // second update should NOT go to the network - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val1)); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val3)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val1)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val3)); SetValues(false); // verify 2 updates were received locally @@ -595,9 +614,9 @@ TEST_F(LocalStorageDuplicatesTest, KeepPubSub) { SetupPubSub(true, true); // second update SHOULD go to the network - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val1)).Times(2); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val1)).Times(2); // EXPECT_CALL(network, SetValue(pub, val2)); - EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val3)); + EXPECT_CALL(network, ClientSetValue(Handle{pub}.GetIndex(), val3)); SetValues(true); // verify all 3 updates were received locally @@ -609,12 +628,12 @@ TEST_F(LocalStorageDuplicatesTest, FromNetworkDefault) { SetupPubSub(false, false); // incoming from the network are treated like a normal local publish - auto topic = storage.NetworkAnnounce("foo", "double", {{}}, std::nullopt); - storage.NetworkSetValue(topic, val1); - storage.NetworkSetValue(topic, val2); + auto topic = storage.ServerAnnounce("foo", 0, "double", {{}}, std::nullopt); + storage.ServerSetValue(topic, val1); + storage.ServerSetValue(topic, val2); // verify the timestamp was updated EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time()); - storage.NetworkSetValue(topic, val3); + storage.ServerSetValue(topic, val3); // verify 2nd update was dropped for local subscriber auto values = storage.ReadQueue(sub); @@ -629,12 +648,12 @@ TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepPub) { SetupPubSub(true, false); // incoming from the network are treated like a normal local publish - auto topic = storage.NetworkAnnounce("foo", "double", {{}}, std::nullopt); - storage.NetworkSetValue(topic, val1); - storage.NetworkSetValue(topic, val2); + auto topic = storage.ServerAnnounce("foo", 0, "double", {{}}, std::nullopt); + storage.ServerSetValue(topic, val1); + storage.ServerSetValue(topic, val2); // verify the timestamp was updated EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time()); - storage.NetworkSetValue(topic, val3); + storage.ServerSetValue(topic, val3); // verify 2nd update was dropped for local subscriber auto values = storage.ReadQueue(sub); @@ -648,12 +667,12 @@ TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepSub) { SetupPubSub(false, true); // incoming from the network are treated like a normal local publish - auto topic = storage.NetworkAnnounce("foo", "double", {{}}, std::nullopt); - storage.NetworkSetValue(topic, val1); - storage.NetworkSetValue(topic, val2); + auto topic = storage.ServerAnnounce("foo", 0, "double", {{}}, std::nullopt); + storage.ServerSetValue(topic, val1); + storage.ServerSetValue(topic, val2); // verify the timestamp was updated EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time()); - storage.NetworkSetValue(topic, val3); + storage.ServerSetValue(topic, val3); // verify 2nd update was received by local subscriber auto values = storage.ReadQueue(sub); @@ -670,12 +689,12 @@ TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepPubSub) { SetupPubSub(true, true); // incoming from the network are treated like a normal local publish - auto topic = storage.NetworkAnnounce("foo", "double", {{}}, std::nullopt); - storage.NetworkSetValue(topic, val1); - storage.NetworkSetValue(topic, val2); + auto topic = storage.ServerAnnounce("foo", 0, "double", {{}}, std::nullopt); + storage.ServerSetValue(topic, val1); + storage.ServerSetValue(topic, val2); // verify the timestamp was updated EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time()); - storage.NetworkSetValue(topic, val3); + storage.ServerSetValue(topic, val3); // verify 2nd update was received by local subscriber auto values = storage.ReadQueue(sub); @@ -745,9 +764,9 @@ void LocalStorageNumberVariantsTest::CreateSubscribersArray() { } TEST_F(LocalStorageNumberVariantsTest, GetEntryPubAfter) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(5); - EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1); - EXPECT_CALL(network, SetValue(_, _)).Times(1); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(5); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)).Times(1); + EXPECT_CALL(network, ClientSetValue(_, _)).Times(1); CreateSubscribers(); auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); @@ -767,9 +786,9 @@ TEST_F(LocalStorageNumberVariantsTest, GetEntryPubAfter) { } TEST_F(LocalStorageNumberVariantsTest, GetEntryPubBefore) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(5); - EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1); - EXPECT_CALL(network, SetValue(_, _)).Times(1); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(5); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)).Times(1); + EXPECT_CALL(network, ClientSetValue(_, _)).Times(1); auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); CreateSubscribers(); storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); @@ -794,9 +813,9 @@ template } TEST_F(LocalStorageNumberVariantsTest, GetAtomic) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(5); - EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1); - EXPECT_CALL(network, SetValue(_, _)).Times(1); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(5); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)).Times(1); + EXPECT_CALL(network, ClientSetValue(_, _)).Times(1); auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); CreateSubscribers(); storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); @@ -822,9 +841,9 @@ template } TEST_F(LocalStorageNumberVariantsTest, GetAtomicArray) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(5); - EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1); - EXPECT_CALL(network, SetValue(_, _)).Times(1); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(5); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)).Times(1); + EXPECT_CALL(network, ClientSetValue(_, _)).Times(1); auto pub = storage.Publish(fooTopic, NT_DOUBLE_ARRAY, "double[]", {}, {}); CreateSubscribersArray(); storage.SetEntryValue(pub, Value::MakeDoubleArray({1.0}, 50)); @@ -846,9 +865,9 @@ TEST_F(LocalStorageNumberVariantsTest, GetAtomicArray) { } TEST_F(LocalStorageNumberVariantsTest, ReadQueue) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(5); - EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1); - EXPECT_CALL(network, SetValue(_, _)).Times(4); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(5); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)).Times(1); + EXPECT_CALL(network, ClientSetValue(_, _)).Times(4); auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); CreateSubscribers(); @@ -893,9 +912,9 @@ TEST_F(LocalStorageNumberVariantsTest, ReadQueue) { } TEST_F(LocalStorageTest, MultiSubSpecial) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(2); - EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(2); - EXPECT_CALL(network, SetValue(_, _)).Times(2); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(2); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)).Times(2); + EXPECT_CALL(network, ClientSetValue(_, _)).Times(2); EXPECT_CALL(listenerStorage, Activate(_, _, _)).Times(2); auto subnormal = storage.SubscribeMultiple({{""}}, {}); @@ -921,26 +940,26 @@ TEST_F(LocalStorageTest, MultiSubSpecial) { } TEST_F(LocalStorageTest, NetworkDuplicateDetect) { - EXPECT_CALL(network, Publish(_, _, _, _, _)); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)); auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); - auto remoteTopic = storage.NetworkAnnounce("foo", "double", - wpi::json::object(), std::nullopt); + auto remoteTopic = storage.ServerAnnounce("foo", 0, "double", + wpi::json::object(), std::nullopt); // local set - EXPECT_CALL(network, SetValue(_, _)); + EXPECT_CALL(network, ClientSetValue(_, _)); storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); // 2nd local set with same value - no SetValue call to network storage.SetEntryValue(pub, Value::MakeDouble(1.0, 60)); // network set with different value - storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 70)); + storage.ServerSetValue(remoteTopic, Value::MakeDouble(2.0, 70)); // 3rd local set with same value generates a SetValue call to network - EXPECT_CALL(network, SetValue(_, _)); + EXPECT_CALL(network, ClientSetValue(_, _)); storage.SetEntryValue(pub, Value::MakeDouble(1.0, 80)); } TEST_F(LocalStorageTest, ReadQueueLocalRemote) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(3); - EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(3); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)).Times(1); auto subBoth = storage.Subscribe(fooTopic, NT_DOUBLE, "double", kDefaultPubSubOptions); @@ -949,11 +968,11 @@ TEST_F(LocalStorageTest, ReadQueueLocalRemote) { auto subRemote = storage.Subscribe(fooTopic, NT_DOUBLE, "double", {.disableLocal = true}); auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); - auto remoteTopic = storage.NetworkAnnounce("foo", "double", - wpi::json::object(), std::nullopt); + auto remoteTopic = storage.ServerAnnounce("foo", 0, "double", + wpi::json::object(), std::nullopt); // local set - EXPECT_CALL(network, SetValue(_, _)); + EXPECT_CALL(network, ClientSetValue(_, _)); storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); EXPECT_THAT(storage.ReadQueue(subBoth), ElementsAre(TSEq(1.0, 50))); @@ -962,7 +981,7 @@ TEST_F(LocalStorageTest, ReadQueueLocalRemote) { EXPECT_THAT(storage.ReadQueue(subRemote), IsEmpty()); // network set - storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); + storage.ServerSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); EXPECT_THAT(storage.ReadQueue(subBoth), ElementsAre(TSEq(2.0, 60))); EXPECT_THAT(storage.ReadQueue(subRemote), @@ -971,25 +990,25 @@ TEST_F(LocalStorageTest, ReadQueueLocalRemote) { } TEST_F(LocalStorageTest, SubExcludePub) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(2); - EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(2); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)).Times(1); auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); auto subActive = storage.Subscribe(fooTopic, NT_DOUBLE, "double", {}); auto subExclude = storage.Subscribe(fooTopic, NT_DOUBLE, "double", {.excludePublisher = pub}); - auto remoteTopic = storage.NetworkAnnounce("foo", "double", - wpi::json::object(), std::nullopt); + auto remoteTopic = storage.ServerAnnounce("foo", 0, "double", + wpi::json::object(), std::nullopt); // local set - EXPECT_CALL(network, SetValue(_, _)); + EXPECT_CALL(network, ClientSetValue(_, _)); storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); EXPECT_THAT(storage.ReadQueue(subActive), ElementsAre(TSEq(1.0, 50))); EXPECT_THAT(storage.ReadQueue(subExclude), IsEmpty()); // network set - storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); + storage.ServerSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); EXPECT_THAT(storage.ReadQueue(subActive), ElementsAre(TSEq(2.0, 60))); EXPECT_THAT(storage.ReadQueue(subExclude), @@ -997,29 +1016,29 @@ TEST_F(LocalStorageTest, SubExcludePub) { } TEST_F(LocalStorageTest, EntryExcludeSelf) { - EXPECT_CALL(network, Subscribe(_, _, _)); + EXPECT_CALL(network, ClientSubscribe(_, _, _)); auto entry = storage.GetEntry(fooTopic, NT_DOUBLE, "double", {.excludeSelf = true}); - auto remoteTopic = storage.NetworkAnnounce("foo", "double", - wpi::json::object(), std::nullopt); + auto remoteTopic = storage.ServerAnnounce("foo", 0, "double", + wpi::json::object(), std::nullopt); // local set - EXPECT_CALL(network, Publish(_, _, _, _, _)); - EXPECT_CALL(network, SetValue(_, _)); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)); + EXPECT_CALL(network, ClientSetValue(_, _)); storage.SetEntryValue(entry, Value::MakeDouble(1.0, 50)); EXPECT_THAT(storage.ReadQueue(entry), IsEmpty()); // network set - storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); + storage.ServerSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); EXPECT_THAT(storage.ReadQueue(entry), ElementsAre(TSEq(2.0, 60))); } TEST_F(LocalStorageTest, ReadQueueInitialLocal) { - EXPECT_CALL(network, Publish(_, _, _, _, _)); - EXPECT_CALL(network, SetValue(_, _)); - EXPECT_CALL(network, Subscribe(_, _, _)).Times(3); + EXPECT_CALL(network, ClientPublish(_, _, _, _, _)); + EXPECT_CALL(network, ClientSetValue(_, _)); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(3); auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); @@ -1039,11 +1058,11 @@ TEST_F(LocalStorageTest, ReadQueueInitialLocal) { } TEST_F(LocalStorageTest, ReadQueueInitialRemote) { - EXPECT_CALL(network, Subscribe(_, _, _)).Times(3); + EXPECT_CALL(network, ClientSubscribe(_, _, _)).Times(3); - auto remoteTopic = storage.NetworkAnnounce("foo", "double", - wpi::json::object(), std::nullopt); - storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); + auto remoteTopic = storage.ServerAnnounce("foo", 0, "double", + wpi::json::object(), std::nullopt); + storage.ServerSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); auto subBoth = storage.Subscribe(fooTopic, NT_DOUBLE, "double", kDefaultPubSubOptions); diff --git a/ntcore/src/test/native/cpp/net/MockMessageHandler.h b/ntcore/src/test/native/cpp/net/MockMessageHandler.h new file mode 100644 index 0000000000..1f26e5c692 --- /dev/null +++ b/ntcore/src/test/native/cpp/net/MockMessageHandler.h @@ -0,0 +1,50 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include + +#include "PubSubOptions.h" +#include "gmock/gmock.h" +#include "net/MessageHandler.h" + +namespace nt::net { + +class MockClientMessageHandler : public net::ClientMessageHandler { + public: + MOCK_METHOD(void, ClientPublish, + (int pubuid, std::string_view name, std::string_view typeStr, + const wpi::json& properties, const PubSubOptionsImpl& options), + (override)); + MOCK_METHOD(void, ClientUnpublish, (int pubuid), (override)); + MOCK_METHOD(void, ClientSetProperties, + (std::string_view name, const wpi::json& update), (override)); + MOCK_METHOD(void, ClientSubscribe, + (int subuid, std::span prefixes, + const PubSubOptionsImpl& options), + (override)); + MOCK_METHOD(void, ClientUnsubscribe, (int subuid), (override)); + MOCK_METHOD(void, ClientSetValue, (int pubuid, const Value& value), + (override)); +}; + +class MockServerMessageHandler : public net::ServerMessageHandler { + public: + MOCK_METHOD(int, ServerAnnounce, + (std::string_view name, int id, std::string_view typeStr, + const wpi::json& properties, std::optional pubuid), + (override)); + MOCK_METHOD(void, ServerUnannounce, (std::string_view name, int id), + (override)); + MOCK_METHOD(void, ServerPropertiesUpdate, + (std::string_view name, const wpi::json& update, bool ack), + (override)); + MOCK_METHOD(void, ServerSetValue, (int topicuid, const Value& value), + (override)); +}; + +} // namespace nt::net diff --git a/ntcore/src/test/native/cpp/net/MockNetworkInterface.h b/ntcore/src/test/native/cpp/net/MockNetworkInterface.h index eaaa4f02f7..8bd77d21ab 100644 --- a/ntcore/src/test/native/cpp/net/MockNetworkInterface.h +++ b/ntcore/src/test/native/cpp/net/MockNetworkInterface.h @@ -4,60 +4,27 @@ #pragma once -#include - #include -#include "PubSubOptions.h" #include "gmock/gmock.h" #include "net/NetworkInterface.h" namespace nt::net { -class MockLocalInterface : public LocalInterface { - public: - MOCK_METHOD(NT_Topic, NetworkAnnounce, - (std::string_view name, std::string_view typeStr, - const wpi::json& properties, std::optional pubuid), - (override)); - MOCK_METHOD(void, NetworkUnannounce, (std::string_view name), (override)); - MOCK_METHOD(void, NetworkPropertiesUpdate, - (std::string_view name, const wpi::json& update, bool ack), - (override)); - MOCK_METHOD(void, NetworkSetValue, (NT_Topic topicHandle, const Value& value), - (override)); -}; - -class MockNetworkInterface : public NetworkInterface { - public: - MOCK_METHOD(void, Publish, - (int pubuid, std::string_view name, std::string_view typeStr, - const wpi::json& properties, const PubSubOptionsImpl& options), - (override)); - MOCK_METHOD(void, Unpublish, (int pubuid), (override)); - MOCK_METHOD(void, SetProperties, - (std::string_view name, const wpi::json& update), (override)); - MOCK_METHOD(void, Subscribe, - (int subuid, std::span prefixes, - const PubSubOptionsImpl& options), - (override)); - MOCK_METHOD(void, Unsubscribe, (int subuid), (override)); - MOCK_METHOD(void, SetValue, (int pubuid, const Value& value), (override)); -}; - class MockLocalStorage : public ILocalStorage { public: - MOCK_METHOD(NT_Topic, NetworkAnnounce, - (std::string_view name, std::string_view typeStr, + MOCK_METHOD(int, ServerAnnounce, + (std::string_view name, int id, std::string_view typeStr, const wpi::json& properties, std::optional pubuid), (override)); - MOCK_METHOD(void, NetworkUnannounce, (std::string_view name), (override)); - MOCK_METHOD(void, NetworkPropertiesUpdate, + MOCK_METHOD(void, ServerUnannounce, (std::string_view name, int id), + (override)); + MOCK_METHOD(void, ServerPropertiesUpdate, (std::string_view name, const wpi::json& update, bool ack), (override)); - MOCK_METHOD(void, NetworkSetValue, (NT_Topic topicHandle, const Value& value), + MOCK_METHOD(void, ServerSetValue, (int topicId, const Value& value), (override)); - MOCK_METHOD(void, StartNetwork, (NetworkInterface * network), (override)); + MOCK_METHOD(void, StartNetwork, (ClientMessageHandler * network), (override)); MOCK_METHOD(void, ClearNetwork, (), (override)); }; diff --git a/ntcore/src/test/native/cpp/net/ServerImplTest.cpp b/ntcore/src/test/native/cpp/net/ServerImplTest.cpp index f381b63f79..064508ec6f 100644 --- a/ntcore/src/test/native/cpp/net/ServerImplTest.cpp +++ b/ntcore/src/test/native/cpp/net/ServerImplTest.cpp @@ -18,7 +18,7 @@ #include "../TestPrinters.h" #include "../ValueMatcher.h" #include "Handle.h" -#include "MockNetworkInterface.h" +#include "MockMessageHandler.h" #include "MockWireConnection.h" #include "gmock/gmock.h" #include "net/Message.h" @@ -44,7 +44,7 @@ namespace nt { class ServerImplTest : public ::testing::Test { public: - ::testing::StrictMock local; + ::testing::StrictMock local; wpi::MockLogger logger; net::ServerImpl server{logger}; }; @@ -150,17 +150,18 @@ TEST_F(ServerImplTest, PublishLocal) { constexpr int pubuid3 = 3; { ::testing::InSequence seq; - EXPECT_CALL(local, NetworkAnnounce( - std::string_view{"test"}, std::string_view{"double"}, - wpi::json::object(), std::optional{pubuid})); EXPECT_CALL( local, - NetworkAnnounce(std::string_view{"test2"}, std::string_view{"double"}, - wpi::json::object(), std::optional{pubuid2})); + ServerAnnounce(std::string_view{"test"}, 0, std::string_view{"double"}, + wpi::json::object(), std::optional{pubuid})); EXPECT_CALL( local, - NetworkAnnounce(std::string_view{"test3"}, std::string_view{"double"}, - wpi::json::object(), std::optional{pubuid3})); + ServerAnnounce(std::string_view{"test2"}, 0, std::string_view{"double"}, + wpi::json::object(), std::optional{pubuid2})); + EXPECT_CALL( + local, + ServerAnnounce(std::string_view{"test3"}, 0, std::string_view{"double"}, + wpi::json::object(), std::optional{pubuid3})); } { @@ -235,9 +236,10 @@ TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) { // publish before client connect server.SetLocal(&local); constexpr int pubuid = 1; - EXPECT_CALL(local, NetworkAnnounce( - std::string_view{"test"}, std::string_view{"double"}, - wpi::json::object(), std::optional{pubuid})); + EXPECT_CALL( + local, + ServerAnnounce(std::string_view{"test"}, 0, std::string_view{"double"}, + wpi::json::object(), std::optional{pubuid})); { std::vector msgs; @@ -311,12 +313,13 @@ TEST_F(ServerImplTest, ClientDisconnectUnpublish) { ::testing::InSequence seq; EXPECT_CALL( local, - NetworkAnnounce(std::string_view{"test2"}, std::string_view{"double"}, - wpi::json::object(), std::optional{pubuidLocal})); - EXPECT_CALL(local, NetworkAnnounce( - std::string_view{"test"}, std::string_view{"double"}, - wpi::json::object(), std::optional{})); - EXPECT_CALL(local, NetworkUnannounce(std::string_view{"test"})); + ServerAnnounce(std::string_view{"test2"}, 0, std::string_view{"double"}, + wpi::json::object(), std::optional{pubuidLocal})); + EXPECT_CALL( + local, + ServerAnnounce(std::string_view{"test"}, 0, std::string_view{"double"}, + wpi::json::object(), std::optional{})); + EXPECT_CALL(local, ServerUnannounce(std::string_view{"test"}, 0)); } { @@ -381,12 +384,13 @@ TEST_F(ServerImplTest, ZeroTimestampNegativeTime) { Value value = Value::MakeDouble(5, -10); { ::testing::InSequence seq; - EXPECT_CALL(local, NetworkAnnounce( - std::string_view{"test"}, std::string_view{"double"}, - wpi::json::object(), std::optional{pubuid})) + EXPECT_CALL( + local, + ServerAnnounce(std::string_view{"test"}, 0, std::string_view{"double"}, + wpi::json::object(), std::optional{pubuid})) .WillOnce(Return(topicHandle)); - EXPECT_CALL(local, NetworkSetValue(topicHandle, defaultValue)); - EXPECT_CALL(local, NetworkSetValue(topicHandle, value)); + EXPECT_CALL(local, ServerSetValue(topicHandle, defaultValue)); + EXPECT_CALL(local, ServerSetValue(topicHandle, value)); } { diff --git a/ntcore/src/test/native/cpp/net/WireDecoderTest.cpp b/ntcore/src/test/native/cpp/net/WireDecoderTest.cpp index fc885ead18..6f8b6ca340 100644 --- a/ntcore/src/test/native/cpp/net/WireDecoderTest.cpp +++ b/ntcore/src/test/native/cpp/net/WireDecoderTest.cpp @@ -9,9 +9,12 @@ #include #include "../MockLogger.h" +#include "../PubSubOptionsMatcher.h" #include "../TestPrinters.h" +#include "MockMessageHandler.h" +#include "PubSubOptions.h" #include "gmock/gmock.h" -#include "net/Message.h" +#include "net/MessageHandler.h" #include "net/WireDecoder.h" #include "networktables/NetworkTableValue.h" @@ -22,39 +25,15 @@ using testing::StrictMock; namespace nt { -class MockClientMessageHandler : public net::ClientMessageHandler { - public: - MOCK_METHOD4(ClientPublish, - void(int pubuid, std::string_view name, std::string_view typeStr, - const wpi::json& properties)); - MOCK_METHOD1(ClientUnpublish, void(int pubuid)); - MOCK_METHOD2(ClientSetProperties, - void(std::string_view name, const wpi::json& update)); - MOCK_METHOD3(ClientSubscribe, - void(int subuid, std::span prefixes, - const PubSubOptionsImpl& options)); - MOCK_METHOD1(ClientUnsubscribe, void(int subuid)); -}; - -class MockServerMessageHandler : public net::ServerMessageHandler { - public: - MOCK_METHOD5(ServerAnnounce, - void(std::string_view name, int id, std::string_view typeStr, - const wpi::json& properties, std::optional pubuid)); - MOCK_METHOD2(ServerUnannounce, void(std::string_view name, int id)); - MOCK_METHOD3(ServerPropertiesUpdate, - void(std::string_view name, const wpi::json& update, bool ack)); -}; - class WireDecodeTextClientTest : public ::testing::Test { public: - StrictMock handler; + StrictMock handler; StrictMock logger; }; class WireDecodeTextServerTest : public ::testing::Test { public: - StrictMock handler; + StrictMock handler; StrictMock logger; }; @@ -129,17 +108,17 @@ TEST_F(WireDecodeTextClientTest, ErrorUnknownMethod) { } TEST_F(WireDecodeTextClientTest, PublishPropsEmpty) { - EXPECT_CALL(handler, - ClientPublish(5, std::string_view{"test"}, - std::string_view{"double"}, wpi::json::object())); + EXPECT_CALL(handler, ClientPublish(5, std::string_view{"test"}, + std::string_view{"double"}, + wpi::json::object(), PubSubOptionsEq({}))); net::WireDecodeText( "[{\"method\":\"publish\",\"params\":{" "\"name\":\"test\",\"properties\":{},\"pubuid\":5,\"type\":\"double\"}}]", handler, logger); - EXPECT_CALL(handler, - ClientPublish(5, std::string_view{"test"}, - std::string_view{"double"}, wpi::json::object())); + EXPECT_CALL(handler, ClientPublish(5, std::string_view{"test"}, + std::string_view{"double"}, + wpi::json::object(), PubSubOptionsEq({}))); net::WireDecodeText( "[{\"method\":\"publish\",\"params\":{" "\"name\":\"test\",\"pubuid\":5,\"type\":\"double\"}}]", @@ -149,7 +128,8 @@ TEST_F(WireDecodeTextClientTest, PublishPropsEmpty) { TEST_F(WireDecodeTextClientTest, PublishProps) { wpi::json props = {{"k", 6}}; EXPECT_CALL(handler, ClientPublish(5, std::string_view{"test"}, - std::string_view{"double"}, props)); + std::string_view{"double"}, props, + PubSubOptionsEq({}))); net::WireDecodeText( "[{\"method\":\"publish\",\"params\":{" "\"name\":\"test\",\"properties\":{\"k\":6},"