diff --git a/ntcore/src/main/native/cpp/LocalStorage.cpp b/ntcore/src/main/native/cpp/LocalStorage.cpp index d0cb335bc9..db8e065d47 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.cpp +++ b/ntcore/src/main/native/cpp/LocalStorage.cpp @@ -427,7 +427,7 @@ void SubscriberData::UpdateActive() { } void LSImpl::NotifyTopic(TopicData* topic, unsigned int eventFlags) { - DEBUG4("NotifyTopic({}, {})\n", topic->name, eventFlags); + DEBUG4("NotifyTopic({}, {})", topic->name, eventFlags); auto topicInfo = topic->GetTopicInfo(); if (!topic->listeners.empty()) { m_listenerStorage.Notify(topic->listeners, eventFlags, topicInfo); @@ -1380,36 +1380,37 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) { } } -void LocalStorage::StartNetwork(net::NetworkStartupInterface& startup, - net::NetworkInterface* network) { +void LocalStorage::StartNetwork(net::NetworkInterface* network) { + WPI_DEBUG4(m_impl->m_logger, "StartNetwork()"); std::scoped_lock lock{m_mutex}; + m_impl->m_network = network; // publish all active publishers to the network and send last values // only send value once per topic for (auto&& topic : m_impl->m_topics) { PublisherData* anyPublisher = nullptr; for (auto&& publisher : topic->localPublishers) { if (publisher->active) { - startup.Publish(publisher->handle, topic->handle, topic->name, - topic->typeStr, topic->properties, publisher->config); + network->Publish(publisher->handle, topic->handle, topic->name, + topic->typeStr, topic->properties, publisher->config); anyPublisher = publisher; } } if (anyPublisher && topic->lastValue) { - startup.SetValue(anyPublisher->handle, topic->lastValue); + network->SetValue(anyPublisher->handle, topic->lastValue); } } for (auto&& subscriber : m_impl->m_subscribers) { - startup.Subscribe(subscriber->handle, {{subscriber->topic->name}}, - subscriber->config); + network->Subscribe(subscriber->handle, {{subscriber->topic->name}}, + subscriber->config); } for (auto&& subscriber : m_impl->m_multiSubscribers) { - startup.Subscribe(subscriber->handle, subscriber->prefixes, - subscriber->options); + network->Subscribe(subscriber->handle, subscriber->prefixes, + subscriber->options); } - m_impl->m_network = network; } void LocalStorage::ClearNetwork() { + WPI_DEBUG4(m_impl->m_logger, "ClearNetwork()"); std::scoped_lock lock{m_mutex}; m_impl->m_network = nullptr; // treat as an unannounce all from the network side diff --git a/ntcore/src/main/native/cpp/LocalStorage.h b/ntcore/src/main/native/cpp/LocalStorage.h index 58644908f9..a93adb0c5e 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.h +++ b/ntcore/src/main/native/cpp/LocalStorage.h @@ -44,8 +44,7 @@ class LocalStorage final : public net::ILocalStorage { bool ack) final; void NetworkSetValue(NT_Topic topicHandle, const Value& value) final; - void StartNetwork(net::NetworkStartupInterface& startup, - net::NetworkInterface* network) final; + void StartNetwork(net::NetworkInterface* network) final; void ClearNetwork() final; // User functions. These are the actual implementations of the corresponding diff --git a/ntcore/src/main/native/cpp/NetworkClient.cpp b/ntcore/src/main/native/cpp/NetworkClient.cpp index f1067d7f33..b227e10e9f 100644 --- a/ntcore/src/main/native/cpp/NetworkClient.cpp +++ b/ntcore/src/main/native/cpp/NetworkClient.cpp @@ -304,11 +304,9 @@ void NCImpl3::TcpConnected(uv::Tcp& tcp) { } }); - { - net3::ClientStartup3 startup{*m_clientImpl}; - m_localStorage.StartNetwork(startup, &m_localQueue); - } m_clientImpl->SetLocal(&m_localStorage); + m_localStorage.StartNetwork(&m_localQueue); + HandleLocal(); }); tcp.SetData(clientImpl); @@ -429,11 +427,10 @@ void NCImpl4::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) { m_sendValuesTimer->Start(uv::Timer::Time{repeatMs}, uv::Timer::Time{repeatMs}); }); - { - net::ClientStartup startup{*m_clientImpl}; - m_localStorage.StartNetwork(startup, &m_localQueue); - } m_clientImpl->SetLocal(&m_localStorage); + m_localStorage.StartNetwork(&m_localQueue); + HandleLocal(); + m_clientImpl->SendInitial(); ws.closed.connect([this, &ws](uint16_t, std::string_view reason) { if (!ws.GetStream().IsLoopClosing()) { Disconnect(reason); diff --git a/ntcore/src/main/native/cpp/NetworkServer.cpp b/ntcore/src/main/native/cpp/NetworkServer.cpp index c0f781e760..971a1fdc78 100644 --- a/ntcore/src/main/native/cpp/NetworkServer.cpp +++ b/ntcore/src/main/native/cpp/NetworkServer.cpp @@ -334,11 +334,9 @@ NSImpl::NSImpl(std::string_view persistentFilename, m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize); m_loopRunner.ExecAsync([=, this](uv::Loop& loop) { // connect local storage to server - { - net::ServerStartup startup{m_serverImpl}; - m_localStorage.StartNetwork(startup, &m_localQueue); - } m_serverImpl.SetLocal(&m_localStorage); + m_localStorage.StartNetwork(&m_localQueue); + HandleLocal(); // load persistent file first, then initialize uv::QueueWork( diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.cpp b/ntcore/src/main/native/cpp/net/ClientImpl.cpp index b606b3827b..87fc6ac7d2 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ClientImpl.cpp @@ -55,6 +55,7 @@ class CImpl : public ServerMessageHandler { void HandleLocal(std::vector&& msgs); bool SendControl(uint64_t curTimeMs); void SendValues(uint64_t curTimeMs); + void SendInitialValues(); bool CheckNetworkReady(); // ServerMessageHandler interface @@ -229,7 +230,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) { } void CImpl::SendValues(uint64_t curTimeMs) { - DEBUG4("SendPeriodic({})", curTimeMs); + DEBUG4("SendValues({})", curTimeMs); // can't send value updates until we have a RTT if (!m_haveTimeOffset) { @@ -268,6 +269,36 @@ void CImpl::SendValues(uint64_t curTimeMs) { } } +void CImpl::SendInitialValues() { + DEBUG4("SendInitialValues()"); + + // ensure all control messages are sent ahead of value updates + if (!SendControl(0)) { + return; + } + + // only send time=0 values (as we don't have a RTT yet) + auto writer = m_wire.SendBinary(); + for (auto&& pub : m_publishers) { + if (pub && !pub->outValues.empty()) { + bool sent = false; + for (auto&& val : pub->outValues) { + if (val.server_time() == 0) { + DEBUG4("Sending {} value time={} server_time={}", pub->handle, + val.time(), val.server_time()); + WireEncodeBinary(writer.Add(), Handle{pub->handle}.GetIndex(), 0, + val); + sent = true; + } + } + if (sent) { + std::erase_if(pub->outValues, + [](const auto& v) { return v.server_time() == 0; }); + } + } + } +} + bool CImpl::CheckNetworkReady() { if (!m_wire.Ready()) { ++m_notReadyCount; @@ -434,46 +465,7 @@ void ClientImpl::SetLocal(LocalInterface* local) { m_impl->m_local = local; } -ClientStartup::ClientStartup(ClientImpl& client) - : m_client{client}, - m_binaryWriter{client.m_impl->m_wire.SendBinary()}, - m_textWriter{client.m_impl->m_wire.SendText()} {} - -ClientStartup::~ClientStartup() { - m_client.m_impl->m_wire.Flush(); -} - -void ClientStartup::Publish(NT_Publisher pubHandle, NT_Topic topicHandle, - std::string_view name, std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) { - WPI_DEBUG4(m_client.m_impl->m_logger, "StartupPublish({}, {}, {}, {})", - pubHandle, topicHandle, name, typeStr); - m_client.m_impl->Publish(pubHandle, topicHandle, name, typeStr, properties, - options); - WireEncodePublish(m_textWriter.Add(), Handle{pubHandle}.GetIndex(), name, - typeStr, properties); -} - -void ClientStartup::Subscribe(NT_Subscriber subHandle, - std::span prefixes, - const PubSubOptionsImpl& options) { - WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSubscribe({})", subHandle); - WireEncodeSubscribe(m_textWriter.Add(), Handle{subHandle}.GetIndex(), - prefixes, options); -} - -void ClientStartup::SetValue(NT_Publisher pubHandle, const Value& value) { - WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSetValue({})", pubHandle); - // Similar to Client::SetValue(), except always set lastValue and send - unsigned int index = Handle{pubHandle}.GetIndex(); - assert(index < m_client.m_impl->m_publishers.size() && - m_client.m_impl->m_publishers[index]); - auto& publisher = *m_client.m_impl->m_publishers[index]; - // only send time 0 values until we have a RTT - if (value.server_time() == 0) { - WireEncodeBinary(m_binaryWriter.Add(), index, 0, value); - } else { - publisher.outValues.emplace_back(value); - } +void ClientImpl::SendInitial() { + m_impl->SendInitialValues(); + m_impl->m_wire.Flush(); } diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.h b/ntcore/src/main/native/cpp/net/ClientImpl.h index ae9a2f978b..4116a54643 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.h +++ b/ntcore/src/main/native/cpp/net/ClientImpl.h @@ -28,12 +28,9 @@ class Value; namespace nt::net { struct ClientMessage; -class ClientStartup; class WireConnection; class ClientImpl { - friend class ClientStartup; - public: ClientImpl(uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger, @@ -48,32 +45,11 @@ class ClientImpl { void SendValues(uint64_t curTimeMs); void SetLocal(LocalInterface* local); + void SendInitial(); private: class Impl; std::unique_ptr m_impl; }; -class ClientStartup final : public NetworkStartupInterface { - public: - explicit ClientStartup(ClientImpl& client); - ~ClientStartup() final; - ClientStartup(const ClientStartup&) = delete; - ClientStartup& operator=(const ClientStartup&) = delete; - - // NetworkStartupInterface interface - void Publish(NT_Publisher pubHandle, NT_Topic topicHandle, - std::string_view name, std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) final; - void Subscribe(NT_Subscriber subHandle, std::span prefixes, - const PubSubOptionsImpl& options) final; - void SetValue(NT_Publisher pubHandle, const Value& value) final; - - private: - ClientImpl& m_client; - BinaryWriter m_binaryWriter; - TextWriter m_textWriter; -}; - } // namespace nt::net diff --git a/ntcore/src/main/native/cpp/net/NetworkInterface.h b/ntcore/src/main/native/cpp/net/NetworkInterface.h index 0ef60335d7..3b2e7dd812 100644 --- a/ntcore/src/main/native/cpp/net/NetworkInterface.h +++ b/ntcore/src/main/native/cpp/net/NetworkInterface.h @@ -35,32 +35,27 @@ class LocalInterface { virtual void NetworkSetValue(NT_Topic topicHandle, const Value& value) = 0; }; -class NetworkStartupInterface { +class NetworkInterface { public: - virtual ~NetworkStartupInterface() = default; + virtual ~NetworkInterface() = default; virtual void Publish(NT_Publisher pubHandle, NT_Topic topicHandle, std::string_view name, std::string_view typeStr, const wpi::json& properties, const PubSubOptionsImpl& options) = 0; - virtual void Subscribe(NT_Subscriber subHandle, - std::span topicNames, - const PubSubOptionsImpl& options) = 0; - virtual void SetValue(NT_Publisher pubHandle, const Value& value) = 0; -}; - -class NetworkInterface : public NetworkStartupInterface { - public: virtual void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) = 0; virtual void SetProperties(NT_Topic topicHandle, std::string_view name, const wpi::json& update) = 0; + virtual void Subscribe(NT_Subscriber subHandle, + std::span topicNames, + const PubSubOptionsImpl& options) = 0; virtual void Unsubscribe(NT_Subscriber subHandle) = 0; + virtual void SetValue(NT_Publisher pubHandle, const Value& value) = 0; }; class ILocalStorage : public LocalInterface { public: - virtual void StartNetwork(NetworkStartupInterface& startup, - NetworkInterface* network) = 0; + virtual void StartNetwork(NetworkInterface* network) = 0; virtual void ClearNetwork() = 0; }; diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.cpp b/ntcore/src/main/native/cpp/net/ServerImpl.cpp index 60c7d4f700..d9b1c0c5fc 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ServerImpl.cpp @@ -168,8 +168,6 @@ class ClientData4Base : public ClientData, protected ClientMessageHandler { }; class ClientDataLocal final : public ClientData4Base { - friend class net::ServerStartup; - public: ClientDataLocal(SImpl& server, int id, wpi::Logger& logger) : ClientData4Base{"", "", "", true, [](uint32_t) {}, server, id, logger} { @@ -2300,6 +2298,7 @@ void ServerImpl::HandleLocal(std::span msgs) { } void ServerImpl::SetLocal(LocalInterface* local) { + WPI_DEBUG4(m_impl->m_logger, "SetLocal()"); m_impl->m_local = local; // create server meta topics @@ -2362,22 +2361,3 @@ std::string ServerImpl::DumpPersistent() { std::string ServerImpl::LoadPersistent(std::string_view in) { return m_impl->LoadPersistent(in); } - -void ServerStartup::Publish(NT_Publisher pubHandle, NT_Topic topicHandle, - std::string_view name, std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) { - m_server.m_impl->m_localClient->ClientPublish(pubHandle, name, typeStr, - properties); -} - -void ServerStartup::Subscribe(NT_Subscriber subHandle, - std::span topicNames, - const PubSubOptionsImpl& options) { - m_server.m_impl->m_localClient->ClientSubscribe(subHandle, topicNames, - options); -} - -void ServerStartup::SetValue(NT_Publisher pubHandle, const Value& value) { - m_server.m_impl->m_localClient->ClientSetValue(pubHandle, value); -} diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.h b/ntcore/src/main/native/cpp/net/ServerImpl.h index d99c6098e3..86607e9bd9 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.h +++ b/ntcore/src/main/native/cpp/net/ServerImpl.h @@ -29,12 +29,9 @@ namespace nt::net { struct ClientMessage; class LocalInterface; -class ServerStartup; class WireConnection; class ServerImpl final { - friend class ServerStartup; - public: using SetPeriodicFunc = std::function; using Connected3Func = @@ -76,22 +73,4 @@ class ServerImpl final { std::unique_ptr m_impl; }; -class ServerStartup final : public NetworkStartupInterface { - public: - explicit ServerStartup(ServerImpl& server) : m_server{server} {} - - // NetworkStartupInterface interface - void Publish(NT_Publisher pubHandle, NT_Topic topicHandle, - std::string_view name, std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) final; - void Subscribe(NT_Subscriber subHandle, - std::span topicNames, - const PubSubOptionsImpl& options) final; - void SetValue(NT_Publisher pubHandle, const Value& value) final; - - private: - ServerImpl& m_server; -}; - } // namespace nt::net diff --git a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp index 53325a8fb9..07838659d7 100644 --- a/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp +++ b/ntcore/src/main/native/cpp/net3/ClientImpl3.cpp @@ -640,34 +640,3 @@ void ClientImpl3::SendPeriodic(uint64_t curTimeMs) { void ClientImpl3::SetLocal(net::LocalInterface* local) { m_impl->m_local = local; } - -ClientStartup3::ClientStartup3(ClientImpl3& client) : m_client{client} {} - -ClientStartup3::~ClientStartup3() = default; - -void ClientStartup3::Publish(NT_Publisher pubHandle, NT_Topic topicHandle, - std::string_view name, std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) { - WPI_DEBUG4(m_client.m_impl->m_logger, "StartupPublish({}, {}, {}, {})", - pubHandle, topicHandle, name, typeStr); - m_client.m_impl->Publish(pubHandle, topicHandle, name, typeStr, properties, - options); -} - -void ClientStartup3::Subscribe(NT_Subscriber subHandle, - std::span prefixes, - const PubSubOptionsImpl& options) { - // NT3 ignores subscribes, so no action required -} - -void ClientStartup3::SetValue(NT_Publisher pubHandle, const Value& value) { - WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSetValue({})", pubHandle); - // Similar to Client::SetValue(), except always set value and queue - unsigned int index = Handle{pubHandle}.GetIndex(); - assert(index < m_client.m_impl->m_publishers.size() && - m_client.m_impl->m_publishers[index]); - auto& publisher = *m_client.m_impl->m_publishers[index]; - publisher.entry->value = value; - publisher.outValues.emplace_back(value); -} diff --git a/ntcore/src/main/native/cpp/net3/ClientImpl3.h b/ntcore/src/main/native/cpp/net3/ClientImpl3.h index 9214800fcf..484ea3d0f5 100644 --- a/ntcore/src/main/native/cpp/net3/ClientImpl3.h +++ b/ntcore/src/main/native/cpp/net3/ClientImpl3.h @@ -25,12 +25,9 @@ class LocalInterface; namespace nt::net3 { -class ClientStartup3; class WireConnection3; class ClientImpl3 { - friend class ClientStartup3; - public: explicit ClientImpl3(uint64_t curTimeMs, int inst, WireConnection3& wire, wpi::Logger& logger, @@ -50,24 +47,4 @@ class ClientImpl3 { std::unique_ptr m_impl; }; -class ClientStartup3 final : public net::NetworkStartupInterface { - public: - explicit ClientStartup3(ClientImpl3& client); - ~ClientStartup3() final; - ClientStartup3(const ClientStartup3&) = delete; - ClientStartup3& operator=(const ClientStartup3&) = delete; - - // NetworkStartupInterface interface - void Publish(NT_Publisher pubHandle, NT_Topic topicHandle, - std::string_view name, std::string_view typeStr, - const wpi::json& properties, - const PubSubOptionsImpl& options) final; - void Subscribe(NT_Subscriber subHandle, std::span prefixes, - const PubSubOptionsImpl& options) final; - void SetValue(NT_Publisher pubHandle, const Value& value) final; - - private: - ClientImpl3& m_client; -}; - } // namespace nt::net3 diff --git a/ntcore/src/test/native/cpp/LocalStorageTest.cpp b/ntcore/src/test/native/cpp/LocalStorageTest.cpp index 0df7c1fe6b..5734284cc3 100644 --- a/ntcore/src/test/native/cpp/LocalStorageTest.cpp +++ b/ntcore/src/test/native/cpp/LocalStorageTest.cpp @@ -42,9 +42,8 @@ namespace nt { class LocalStorageTest : public ::testing::Test { public: - LocalStorageTest() { storage.StartNetwork(startup, &network); } + LocalStorageTest() { storage.StartNetwork(&network); } - ::testing::StrictMock startup; ::testing::StrictMock network; wpi::MockLogger logger; MockListenerStorage listenerStorage; diff --git a/ntcore/src/test/native/cpp/net/MockNetworkInterface.h b/ntcore/src/test/native/cpp/net/MockNetworkInterface.h index bdf3e3ecbf..b0334362a3 100644 --- a/ntcore/src/test/native/cpp/net/MockNetworkInterface.h +++ b/ntcore/src/test/native/cpp/net/MockNetworkInterface.h @@ -28,21 +28,6 @@ class MockLocalInterface : public LocalInterface { (override)); }; -class MockNetworkStartupInterface : public NetworkStartupInterface { - public: - MOCK_METHOD(void, Publish, - (NT_Publisher pubHandle, NT_Topic topicHandle, - std::string_view name, std::string_view typeStr, - const wpi::json& properties, const PubSubOptionsImpl& options), - (override)); - MOCK_METHOD(void, Subscribe, - (NT_Subscriber subHandle, std::span prefixes, - const PubSubOptionsImpl& options), - (override)); - MOCK_METHOD(void, SetValue, (NT_Publisher pubHandle, const Value& value), - (override)); -}; - class MockNetworkInterface : public NetworkInterface { public: MOCK_METHOD(void, Publish, @@ -77,9 +62,7 @@ class MockLocalStorage : public ILocalStorage { (override)); MOCK_METHOD(void, NetworkSetValue, (NT_Topic topicHandle, const Value& value), (override)); - MOCK_METHOD(void, StartNetwork, - (NetworkStartupInterface & startup, NetworkInterface* network), - (override)); + MOCK_METHOD(void, StartNetwork, (NetworkInterface * network), (override)); MOCK_METHOD(void, ClearNetwork, (), (override)); };