diff --git a/ntcore/src/main/native/cpp/LocalStorage.cpp b/ntcore/src/main/native/cpp/LocalStorage.cpp index ceaf011e6e..ede8cc1d57 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.cpp +++ b/ntcore/src/main/native/cpp/LocalStorage.cpp @@ -909,6 +909,7 @@ std::unique_ptr LSImpl::RemoveEntry(NT_Entry entryHandle) { MultiSubscriberData* LSImpl::AddMultiSubscriber( std::span prefixes, const PubSubOptions& options) { + DEBUG4("AddMultiSubscriber({})", fmt::join(prefixes, ",")); auto subscriber = m_multiSubscribers.Add(m_inst, prefixes, options); // subscribe to any already existing topics for (auto&& topic : m_topics) { @@ -920,6 +921,7 @@ MultiSubscriberData* LSImpl::AddMultiSubscriber( } } if (m_network) { + DEBUG4("-> NetworkSubscribe"); m_network->Subscribe(subscriber->handle, subscriber->prefixes, subscriber->options); } diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.cpp b/ntcore/src/main/native/cpp/net/ServerImpl.cpp index d9b1c0c5fc..8c0976a188 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ServerImpl.cpp @@ -656,10 +656,13 @@ void ClientData4Base::ClientSubscribe(int64_t subuid, // is client already subscribed? bool wasSubscribed = false; + bool wasSubscribedValue = false; for (auto subscriber : topic->subscribers) { if (subscriber->client == this) { wasSubscribed = true; - break; + if (!subscriber->options.topicsOnly) { + wasSubscribedValue = true; + } } } @@ -673,16 +676,17 @@ void ClientData4Base::ClientSubscribe(int64_t subuid, m_server.UpdateMetaTopicSub(topic.get()); } - if (!wasSubscribed && added && !removed) { - // announce topic to client + // announce topic to client if not previously announced + if (added && !removed && !wasSubscribed) { DEBUG4("client {}: announce {}", m_id, topic->name); SendAnnounce(topic.get(), std::nullopt); + } - // send last value - if (!sub->options.topicsOnly && topic->lastValue) { - DEBUG4("send last value for {} to client {}", topic->name, m_id); - SendValue(topic.get(), topic->lastValue, kSendAll); - } + // send last value + if (added && !sub->options.topicsOnly && !wasSubscribedValue && + topic->lastValue) { + DEBUG4("send last value for {} to client {}", topic->name, m_id); + SendValue(topic.get(), topic->lastValue, kSendAll); } } diff --git a/ntcore/src/test/native/cpp/SpanMatcher.h b/ntcore/src/test/native/cpp/SpanMatcher.h index 9973c036cb..0d6c6b5065 100644 --- a/ntcore/src/test/native/cpp/SpanMatcher.h +++ b/ntcore/src/test/native/cpp/SpanMatcher.h @@ -33,8 +33,10 @@ class SpanMatcher : public ::testing::MatcherInterface> { }; template -inline ::testing::Matcher> SpanEq(std::span good) { - return ::testing::MakeMatcher(new SpanMatcher(good)); +inline ::testing::Matcher> SpanEq( + const T& good) { + return ::testing::MakeMatcher( + new SpanMatcher(std::span(good))); } template diff --git a/ntcore/src/test/native/cpp/net/ServerImplTest.cpp b/ntcore/src/test/native/cpp/net/ServerImplTest.cpp new file mode 100644 index 0000000000..69ff30abaf --- /dev/null +++ b/ntcore/src/test/native/cpp/net/ServerImplTest.cpp @@ -0,0 +1,269 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include + +#include +#include +#include + +#include "../MockLogger.h" +#include "../PubSubOptionsMatcher.h" +#include "../SpanMatcher.h" +#include "../TestPrinters.h" +#include "../ValueMatcher.h" +#include "Handle.h" +#include "MockNetworkInterface.h" +#include "MockWireConnection.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "net/Message.h" +#include "net/ServerImpl.h" +#include "net/WireEncoder.h" +#include "ntcore_c.h" +#include "ntcore_cpp.h" + +using ::testing::_; +using ::testing::AllOf; +using ::testing::ElementsAre; +using ::testing::Field; +using ::testing::IsEmpty; +using ::testing::Property; +using ::testing::Return; + +using MockSetPeriodicFunc = ::testing::MockFunction; +using MockConnected3Func = + ::testing::MockFunction; + +namespace nt { + +class ServerImplTest : public ::testing::Test { + public: + ::testing::StrictMock local; + wpi::MockLogger logger; + net::ServerImpl server{logger}; +}; + +TEST_F(ServerImplTest, AddClient) { + ::testing::StrictMock wire; + EXPECT_CALL(wire, Flush()); + MockSetPeriodicFunc setPeriodic; + auto [name, id] = server.AddClient("test", "connInfo", false, wire, + setPeriodic.AsStdFunction()); + EXPECT_EQ(name, "test"); + EXPECT_NE(id, -1); +} + +TEST_F(ServerImplTest, AddDuplicateClient) { + ::testing::StrictMock wire1; + ::testing::StrictMock wire2; + MockSetPeriodicFunc setPeriodic1; + MockSetPeriodicFunc setPeriodic2; + EXPECT_CALL(wire1, Flush()); + EXPECT_CALL(wire2, Flush()); + + auto [name1, id1] = server.AddClient("test", "connInfo", false, wire1, + setPeriodic1.AsStdFunction()); + auto [name2, id2] = server.AddClient("test", "connInfo2", false, wire2, + setPeriodic2.AsStdFunction()); + EXPECT_EQ(name1, "test"); + EXPECT_NE(id1, -1); + EXPECT_EQ(name2, "test@1"); + EXPECT_NE(id1, id2); + EXPECT_NE(id2, -1); +} + +TEST_F(ServerImplTest, AddClient3) {} + +template +static std::string EncodeText(const T& msgs) { + std::string data; + wpi::raw_string_ostream os{data}; + bool first = true; + for (auto&& msg : msgs) { + if (first) { + os << '['; + first = false; + } else { + os << ','; + } + net::WireEncodeText(os, msg); + } + os << ']'; + return data; +} + +template +static std::vector EncodeBinary(const T& msgs) { + std::vector data; + wpi::raw_uvector_ostream os{data}; + for (auto&& msg : msgs) { + if (auto m = std::get_if(&msg.contents)) { + net::WireEncodeBinary(os, m->topic, m->value.time(), m->value); + } + } + return data; +} + +TEST_F(ServerImplTest, PublishLocal) { + // publish before client connect + server.SetLocal(&local); + NT_Publisher pubHandle = nt::Handle{0, 1, nt::Handle::kPublisher}; + NT_Topic topicHandle = nt::Handle{0, 1, nt::Handle::kTopic}; + NT_Publisher pubHandle2 = nt::Handle{0, 2, nt::Handle::kPublisher}; + NT_Topic topicHandle2 = nt::Handle{0, 2, nt::Handle::kTopic}; + NT_Publisher pubHandle3 = nt::Handle{0, 3, nt::Handle::kPublisher}; + NT_Topic topicHandle3 = nt::Handle{0, 3, nt::Handle::kTopic}; + { + ::testing::InSequence seq; + EXPECT_CALL(local, NetworkAnnounce("test", "double", wpi::json::object(), + pubHandle)); + EXPECT_CALL(local, NetworkAnnounce("test2", "double", wpi::json::object(), + pubHandle2)); + EXPECT_CALL(local, NetworkAnnounce("test3", "double", wpi::json::object(), + pubHandle3)); + } + + { + std::vector msgs; + msgs.emplace_back(net::ClientMessage{net::PublishMsg{ + pubHandle, topicHandle, "test", "double", wpi::json::object(), {}}}); + server.HandleLocal(msgs); + } + + // client connect; it should get already-published topic as soon as it + // subscribes + ::testing::StrictMock wire; + MockSetPeriodicFunc setPeriodic; + { + ::testing::InSequence seq; + EXPECT_CALL(wire, Flush()); // AddClient() + EXPECT_CALL(setPeriodic, Call(100)); // ClientSubscribe() + EXPECT_CALL(wire, Flush()); // ClientSubscribe() + EXPECT_CALL(wire, Ready()).WillOnce(Return(true)); // SendControl() + { + std::vector smsgs; + smsgs.emplace_back(net::ServerMessage{net::AnnounceMsg{ + "test", 3, "double", std::nullopt, wpi::json::object()}}); + smsgs.emplace_back(net::ServerMessage{net::AnnounceMsg{ + "test2", 8, "double", std::nullopt, wpi::json::object()}}); + EXPECT_CALL(wire, Text(EncodeText(smsgs))); // SendControl() + } + EXPECT_CALL(wire, Flush()); // SendControl() + EXPECT_CALL(wire, Ready()).WillOnce(Return(true)); // SendControl() + { + std::vector smsgs; + smsgs.emplace_back(net::ServerMessage{net::AnnounceMsg{ + "test3", 11, "double", std::nullopt, wpi::json::object()}}); + EXPECT_CALL(wire, Text(EncodeText(smsgs))); // SendControl() + } + EXPECT_CALL(wire, Flush()); // SendControl() + } + auto [name, id] = server.AddClient("test", "connInfo", false, wire, + setPeriodic.AsStdFunction()); + + { + NT_Subscriber subHandle = nt::Handle{0, 1, nt::Handle::kSubscriber}; + std::vector msgs; + msgs.emplace_back(net::ClientMessage{net::SubscribeMsg{ + subHandle, {{""}}, PubSubOptions{.prefixMatch = true}}}); + server.ProcessIncomingText(id, EncodeText(msgs)); + } + + // publish before send control + { + std::vector msgs; + msgs.emplace_back(net::ClientMessage{net::PublishMsg{ + pubHandle2, topicHandle2, "test2", "double", wpi::json::object(), {}}}); + server.HandleLocal(msgs); + } + + server.SendControl(100); + + // publish after send control + { + std::vector msgs; + msgs.emplace_back(net::ClientMessage{net::PublishMsg{ + pubHandle3, topicHandle3, "test3", "double", wpi::json::object(), {}}}); + server.HandleLocal(msgs); + } + + server.SendControl(200); +} + +TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) { + // publish before client connect + server.SetLocal(&local); + NT_Publisher pubHandle = nt::Handle{0, 1, nt::Handle::kPublisher}; + NT_Topic topicHandle = nt::Handle{0, 1, nt::Handle::kTopic}; + EXPECT_CALL( + local, NetworkAnnounce("test", "double", wpi::json::object(), pubHandle)); + + { + std::vector msgs; + msgs.emplace_back(net::ClientMessage{net::PublishMsg{ + pubHandle, topicHandle, "test", "double", wpi::json::object(), {}}}); + msgs.emplace_back(net::ClientMessage{ + net::ClientValueMsg{pubHandle, Value::MakeDouble(1.0, 10)}}); + server.HandleLocal(msgs); + } + + ::testing::StrictMock wire; + MockSetPeriodicFunc setPeriodic; + { + ::testing::InSequence seq; + EXPECT_CALL(wire, Flush()); // AddClient() + EXPECT_CALL(setPeriodic, Call(100)); // ClientSubscribe() + EXPECT_CALL(wire, Flush()); // ClientSubscribe() + EXPECT_CALL(wire, Ready()).WillOnce(Return(true)); // SendValues() + { + std::vector smsgs; + smsgs.emplace_back(net::ServerMessage{net::AnnounceMsg{ + "test", 3, "double", std::nullopt, wpi::json::object()}}); + EXPECT_CALL(wire, Text(EncodeText(smsgs))); // SendValues() + } + EXPECT_CALL(wire, Flush()); // SendValues() + EXPECT_CALL(setPeriodic, Call(100)); // ClientSubscribe() + EXPECT_CALL(wire, Flush()); // ClientSubscribe() + EXPECT_CALL(wire, Ready()).WillOnce(Return(true)); // SendValues() + { + std::vector smsgs; + smsgs.emplace_back(net::ServerMessage{ + net::ServerValueMsg{3, Value::MakeDouble(1.0, 10)}}); + EXPECT_CALL(wire, + Binary(wpi::SpanEq(EncodeBinary(smsgs)))); // SendValues() + } + EXPECT_CALL(wire, Flush()); // SendValues() + } + + // connect client + auto [name, id] = server.AddClient("test", "connInfo", false, wire, + setPeriodic.AsStdFunction()); + + // subscribe topics only; will not send value + { + NT_Subscriber subHandle = nt::Handle{0, 1, nt::Handle::kSubscriber}; + std::vector msgs; + msgs.emplace_back(net::ClientMessage{net::SubscribeMsg{ + subHandle, + {{""}}, + PubSubOptions{.topicsOnly = true, .prefixMatch = true}}}); + server.ProcessIncomingText(id, EncodeText(msgs)); + } + + server.SendValues(id, 100); + + // subscribe normal; will not resend announcement, but will send value + { + NT_Subscriber subHandle = nt::Handle{0, 2, nt::Handle::kSubscriber}; + std::vector msgs; + msgs.emplace_back(net::ClientMessage{ + net::SubscribeMsg{subHandle, {{"test"}}, PubSubOptions{}}}); + server.ProcessIncomingText(id, EncodeText(msgs)); + } + + server.SendValues(id, 200); +} + +} // namespace nt