diff --git a/ntcore/src/main/native/cpp/LocalStorage.cpp b/ntcore/src/main/native/cpp/LocalStorage.cpp index dd5d4e4a93..3ac16531cf 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.cpp +++ b/ntcore/src/main/native/cpp/LocalStorage.cpp @@ -260,8 +260,9 @@ struct LSImpl { void CheckReset(TopicData* topic); - bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags); - void NotifyValue(TopicData* topic, unsigned int eventFlags); + bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags, + bool isDuplicate); + void NotifyValue(TopicData* topic, unsigned int eventFlags, bool isDuplicate); void SetFlags(TopicData* topic, unsigned int flags); void SetPersistent(TopicData* topic, bool value); @@ -323,7 +324,8 @@ struct LSImpl { PublisherData* PublishEntry(EntryData* entry, NT_Type type); Value* GetSubEntryValue(NT_Handle subentryHandle); - bool PublishLocalValue(PublisherData* publisher, const Value& value); + bool PublishLocalValue(PublisherData* publisher, const Value& value, + bool force = false); bool SetEntryValue(NT_Handle pubentryHandle, const Value& value); bool SetDefaultEntryValue(NT_Handle pubsubentryHandle, const Value& value); @@ -468,7 +470,7 @@ void LSImpl::CheckReset(TopicData* topic) { } bool LSImpl::SetValue(TopicData* topic, const Value& value, - unsigned int eventFlags) { + unsigned int eventFlags, bool isDuplicate) { if (topic->type != NT_UNASSIGNED && topic->type != value.type()) { return false; } @@ -479,9 +481,9 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value, topic->type = value.type(); topic->lastValue = value; topic->lastValueNetwork = isNetwork; - NotifyValue(topic, eventFlags); + NotifyValue(topic, eventFlags, isDuplicate); } - if (topic->datalogType == value.type()) { + if (!isDuplicate && topic->datalogType == value.type()) { for (auto&& datalog : topic->datalogs) { datalog.Append(value); } @@ -489,9 +491,11 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value, return true; } -void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags) { +void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags, + bool isDuplicate) { for (auto&& subscriber : topic->localSubscribers) { - if (subscriber->active) { + if (subscriber->active && + (subscriber->config.keepDuplicates || !isDuplicate)) { subscriber->pollStorage.emplace_back(topic->lastValue); subscriber->handle.Set(); if (!subscriber->valueListeners.empty()) { @@ -502,10 +506,12 @@ void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags) { } for (auto&& subscriber : topic->multiSubscribers) { - subscriber->handle.Set(); - if (!subscriber->valueListeners.empty()) { - m_listenerStorage.Notify(subscriber->valueListeners, eventFlags, - topic->handle, 0, topic->lastValue); + if (subscriber->options.keepDuplicates || !isDuplicate) { + subscriber->handle.Set(); + if (!subscriber->valueListeners.empty()) { + m_listenerStorage.Notify(subscriber->valueListeners, eventFlags, + topic->handle, 0, topic->lastValue); + } } } } @@ -1198,7 +1204,8 @@ Value* LSImpl::GetSubEntryValue(NT_Handle subentryHandle) { } } -bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value) { +bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value, + bool force) { if (!value) { return false; } @@ -1211,10 +1218,16 @@ bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value) { return false; } if (publisher->active) { - if (m_network) { + bool isDuplicate; + if (force || publisher->config.keepDuplicates) { + isDuplicate = false; + } else { + isDuplicate = (publisher->topic->lastValue == value); + } + if (!isDuplicate && m_network) { m_network->SetValue(publisher->handle, value); } - return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL); + return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate); } else { return false; } @@ -1269,7 +1282,7 @@ bool LSImpl::SetDefaultEntryValue(NT_Handle pubsubentryHandle, topic->lastValue.SetTime(0); topic->lastValue.SetServerTime(0); if (publisher) { - PublishLocalValue(publisher, topic->lastValue); + PublishLocalValue(publisher, topic->lastValue, true); } return true; } @@ -1333,7 +1346,8 @@ void LocalStorage::NetworkPropertiesUpdate(std::string_view name, void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) { std::scoped_lock lock{m_mutex}; if (auto topic = m_impl->m_topics.Get(topicHandle)) { - m_impl->SetValue(topic, value, NT_EVENT_VALUE_REMOTE); + m_impl->SetValue(topic, value, NT_EVENT_VALUE_REMOTE, + value == topic->lastValue); } } diff --git a/ntcore/src/test/native/cpp/LocalStorageTest.cpp b/ntcore/src/test/native/cpp/LocalStorageTest.cpp index 2598a88346..7030d55992 100644 --- a/ntcore/src/test/native/cpp/LocalStorageTest.cpp +++ b/ntcore/src/test/native/cpp/LocalStorageTest.cpp @@ -13,6 +13,7 @@ #include "gtest/gtest.h" #include "net/MockNetworkInterface.h" #include "ntcore_c.h" +#include "ntcore_cpp.h" using ::testing::_; using ::testing::AllOf; @@ -162,7 +163,7 @@ TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPost) { EXPECT_EQ(vals[0].value, true); EXPECT_EQ(vals[0].time, 5); - val = Value::MakeBoolean(true, 6); + val = Value::MakeBoolean(false, 6); EXPECT_CALL(network, SetValue(pub, val)); storage.SetEntryValue(pub, val); @@ -225,7 +226,7 @@ TEST_F(LocalStorageTest, EntryNoTypeLocalSet) { EXPECT_EQ(vals[0].time, 5); // normal set with same type - val = Value::MakeBoolean(true, 6); + val = Value::MakeBoolean(false, 6); EXPECT_CALL(network, SetValue(_, val)); EXPECT_TRUE(storage.SetEntryValue(entry, val)); @@ -486,6 +487,107 @@ TEST_F(LocalStorageTest, SetValueInvalidHandle) { EXPECT_FALSE(storage.SetEntryValue(0u, {})); } +class LocalStorageDuplicatesTest : public LocalStorageTest { + public: + void SetupPubSub(bool keepPub, bool keepSub); + void SetValues(); + + NT_Publisher pub; + NT_Subscriber sub; + Value val1 = Value::MakeDouble(1.0, 10); + Value val2 = Value::MakeDouble(1.0, 20); // duplicate value + Value val3 = Value::MakeDouble(2.0, 30); +}; + +void LocalStorageDuplicatesTest::SetupPubSub(bool keepPub, bool keepSub) { + PubSubOptions pubOptions; + pubOptions.keepDuplicates = keepPub; + EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"}, + std::string_view{"double"}, wpi::json::object(), + IsPubSubOptions(pubOptions))); + pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, + {{PubSubOption::KeepDuplicates(keepPub)}}); + + PubSubOptions subOptions; + subOptions.pollStorageSize = 10; + subOptions.keepDuplicates = keepSub; + EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}), + IsPubSubOptions(subOptions))); + sub = storage.Subscribe( + fooTopic, NT_DOUBLE, "double", + {{PubSubOption::KeepDuplicates(keepSub), PubSubOption::PollStorage(10)}}); +} + +void LocalStorageDuplicatesTest::SetValues() { + storage.SetEntryValue(pub, val1); + storage.SetEntryValue(pub, val2); + // verify the timestamp was updated + EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time()); + storage.SetEntryValue(pub, val3); +} + +TEST_F(LocalStorageDuplicatesTest, Defaults) { + SetupPubSub(false, false); + + EXPECT_CALL(network, SetValue(pub, val1)); + EXPECT_CALL(network, SetValue(pub, val3)); + SetValues(); + + // verify 2nd update was dropped locally + auto values = storage.ReadQueueDouble(sub); + ASSERT_EQ(values.size(), 2u); + ASSERT_EQ(values[0].value, val1.GetDouble()); + ASSERT_EQ(values[0].time, val1.time()); + ASSERT_EQ(values[1].value, val3.GetDouble()); + ASSERT_EQ(values[1].time, val3.time()); +} + +TEST_F(LocalStorageDuplicatesTest, KeepPub) { + SetupPubSub(true, false); + + EXPECT_CALL(network, SetValue(pub, val1)).Times(2); + // EXPECT_CALL(network, SetValue(pub, val2)); + EXPECT_CALL(network, SetValue(pub, val3)); + SetValues(); + + // verify all 3 updates were received locally + auto values = storage.ReadQueueDouble(sub); + ASSERT_EQ(values.size(), 3u); +} + +TEST_F(LocalStorageDuplicatesTest, KeepSub) { + SetupPubSub(false, true); + + // second update should NOT go to the network + EXPECT_CALL(network, SetValue(pub, val1)); + EXPECT_CALL(network, SetValue(pub, val3)); + SetValues(); + + // verify all 3 updates were received locally + auto values = storage.ReadQueueDouble(sub); + ASSERT_EQ(values.size(), 3u); +} + +TEST_F(LocalStorageDuplicatesTest, FromNetwork) { + SetupPubSub(false, false); + + // incoming from the network are treated like a normal local publish + auto topic = storage.NetworkAnnounce("foo", "double", {{}}, 0); + storage.NetworkSetValue(topic, val1); + storage.NetworkSetValue(topic, val2); + // verify the timestamp was updated + EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time()); + storage.NetworkSetValue(topic, val3); + + // verify 2nd update was dropped locally + auto values = storage.ReadQueueDouble(sub); + ASSERT_EQ(values.size(), 2u); + ASSERT_EQ(values[0].value, val1.GetDouble()); + ASSERT_EQ(values[0].time, val1.time()); + ASSERT_EQ(values[1].value, val3.GetDouble()); + ASSERT_EQ(values[1].time, val3.time()); +} + class LocalStorageNumberVariantsTest : public LocalStorageTest { public: void CreateSubscriber(NT_Handle* handle, std::string_view name, NT_Type type, @@ -658,29 +760,29 @@ TEST_F(LocalStorageNumberVariantsTest, ReadQueue) { } } - storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); + storage.SetEntryValue(pub, Value::MakeDouble(2.0, 50)); for (auto&& subentry : subentries) { SCOPED_TRACE(subentry.name); if (subentry.type == NT_BOOLEAN) { EXPECT_THAT(storage.ReadQueueInteger(subentry.subentry), IsEmpty()); } else { EXPECT_THAT(storage.ReadQueueInteger(subentry.subentry), - ElementsAre(TSEq(1, 50))); + ElementsAre(TSEq(2, 50))); } } - storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); + storage.SetEntryValue(pub, Value::MakeDouble(3.0, 50)); for (auto&& subentry : subentries) { SCOPED_TRACE(subentry.name); if (subentry.type == NT_BOOLEAN) { EXPECT_THAT(storage.ReadQueueFloat(subentry.subentry), IsEmpty()); } else { EXPECT_THAT(storage.ReadQueueFloat(subentry.subentry), - ElementsAre(TSEq(1.0, 50))); + ElementsAre(TSEq(3.0, 50))); } } - storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); + storage.SetEntryValue(pub, Value::MakeDouble(4.0, 50)); for (auto&& subentry : subentries) { SCOPED_TRACE(subentry.name); EXPECT_THAT(storage.ReadQueueBoolean(subentry.subentry), IsEmpty());