diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/PubSubOption.java b/ntcore/src/main/java/edu/wpi/first/networktables/PubSubOption.java index 371f564a83..00fa508d04 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/PubSubOption.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/PubSubOption.java @@ -12,6 +12,8 @@ public class PubSubOption { private static final int kPollStorage = 4; private static final int kKeepDuplicates = 5; private static final int kLocalRemote = 6; + private static final int kExcludePub = 7; + private static final int kExcludeSelf = 8; PubSubOption(int type, int value) { m_type = type; @@ -105,6 +107,39 @@ public class PubSubOption { return new PubSubOption(kLocalRemote, 0); } + /** + * Don't queue value updates for the given publisher. Only has an effect on subscriptions. Only + * one exclusion may be set. + * + * @param publisher publisher handle to exclude + * @return option + */ + public static PubSubOption excludePublisher(int publisher) { + return new PubSubOption(kExcludePub, publisher); + } + + /** + * Don't queue value updates for the given publisher. Only has an effect on subscriptions. Only + * one exclusion may be set. + * + * @param publisher publisher to exclude + * @return option + */ + public static PubSubOption excludePublisher(Publisher publisher) { + return new PubSubOption(kExcludePub, publisher != null ? publisher.getHandle() : 0); + } + + /** + * Don't queue value updates for the internal publisher for an entry. Only has an effect on + * entries. + * + * @param enabled True to enable, false to disable + * @return option + */ + public static PubSubOption excludeSelf(boolean enabled) { + return new PubSubOption(kExcludeSelf, enabled ? 1 : 0); + } + final int m_type; final int m_value; } diff --git a/ntcore/src/main/native/cpp/LocalStorage.cpp b/ntcore/src/main/native/cpp/LocalStorage.cpp index 9ec17d6240..6325900c15 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.cpp +++ b/ntcore/src/main/native/cpp/LocalStorage.cpp @@ -282,8 +282,9 @@ struct LSImpl { void CheckReset(TopicData* topic); bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags, - bool isDuplicate); - void NotifyValue(TopicData* topic, unsigned int eventFlags, bool isDuplicate); + bool isDuplicate, const PublisherData* publisher); + void NotifyValue(TopicData* topic, unsigned int eventFlags, bool isDuplicate, + const PublisherData* publisher); void SetFlags(TopicData* topic, unsigned int flags); void SetPersistent(TopicData* topic, bool value); @@ -489,7 +490,8 @@ void LSImpl::CheckReset(TopicData* topic) { } bool LSImpl::SetValue(TopicData* topic, const Value& value, - unsigned int eventFlags, bool isDuplicate) { + unsigned int eventFlags, bool isDuplicate, + const PublisherData* publisher) { DEBUG4("SetValue({}, {}, {}, {})", topic->name, value.time(), eventFlags, isDuplicate); if (topic->type != NT_UNASSIGNED && topic->type != value.type()) { @@ -499,7 +501,7 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value, // TODO: notify option even if older value topic->type = value.type(); topic->lastValue = value; - NotifyValue(topic, eventFlags, isDuplicate); + NotifyValue(topic, eventFlags, isDuplicate, publisher); } if (!isDuplicate && topic->datalogType == value.type()) { for (auto&& datalog : topic->datalogs) { @@ -510,13 +512,15 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value, } void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags, - bool isDuplicate) { + bool isDuplicate, const PublisherData* publisher) { bool isNetwork = (eventFlags & NT_EVENT_VALUE_REMOTE) != 0; for (auto&& subscriber : topic->localSubscribers) { if (subscriber->active && (subscriber->config.keepDuplicates || !isDuplicate) && ((isNetwork && subscriber->config.fromRemote) || - (!isNetwork && subscriber->config.fromLocal))) { + (!isNetwork && subscriber->config.fromLocal)) && + (!publisher || + (publisher && (subscriber->config.excludePub != publisher->handle)))) { subscriber->pollStorage.emplace_back(topic->lastValue); subscriber->handle.Set(); if (!subscriber->valueListeners.empty()) { @@ -1246,7 +1250,8 @@ bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value, publisher->topic->lastValueNetwork = value; m_network->SetValue(publisher->handle, value); } - return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate); + return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate, + publisher); } else { return false; } @@ -1260,6 +1265,9 @@ bool LSImpl::SetEntryValue(NT_Handle pubentryHandle, const Value& value) { if (!publisher) { if (auto entry = m_entries.Get(pubentryHandle)) { publisher = PublishEntry(entry, value.type()); + if (entry->subscriber->config.excludeSelf) { + entry->subscriber->config.excludePub = publisher->handle; + } } if (!publisher) { return false; @@ -1366,7 +1374,7 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) { std::scoped_lock lock{m_mutex}; if (auto topic = m_impl->m_topics.Get(topicHandle)) { if (m_impl->SetValue(topic, value, NT_EVENT_VALUE_REMOTE, - value == topic->lastValue)) { + value == topic->lastValue, nullptr)) { topic->lastValueNetwork = value; } } diff --git a/ntcore/src/main/native/cpp/PubSubOptions.cpp b/ntcore/src/main/native/cpp/PubSubOptions.cpp index 538bc501b3..92be2c26c8 100644 --- a/ntcore/src/main/native/cpp/PubSubOptions.cpp +++ b/ntcore/src/main/native/cpp/PubSubOptions.cpp @@ -4,6 +4,7 @@ #include "PubSubOptions.h" +#include "ntcore_c.h" #include "ntcore_cpp.h" using namespace nt; @@ -48,6 +49,12 @@ nt::PubSubOptions::PubSubOptions(std::span options) { break; } break; + case NT_PUBSUB_EXCLUDEPUB: + excludePub = option.value; + break; + case NT_PUBSUB_EXCLUDESELF: + excludeSelf = option.value != 0; + break; default: break; } diff --git a/ntcore/src/main/native/cpp/PubSubOptions.h b/ntcore/src/main/native/cpp/PubSubOptions.h index ba57e0e11a..35e97a4087 100644 --- a/ntcore/src/main/native/cpp/PubSubOptions.h +++ b/ntcore/src/main/native/cpp/PubSubOptions.h @@ -26,6 +26,8 @@ class PubSubOptions { bool keepDuplicates = false; bool fromRemote = true; bool fromLocal = true; + unsigned int excludePub = 0; + bool excludeSelf = false; }; } // namespace nt diff --git a/ntcore/src/main/native/include/ntcore_c.h b/ntcore/src/main/native/include/ntcore_c.h index e773abd2e1..3d2d17e33e 100644 --- a/ntcore/src/main/native/include/ntcore_c.h +++ b/ntcore/src/main/native/include/ntcore_c.h @@ -96,6 +96,8 @@ enum NT_PubSubOptionType { NT_PUBSUB_POLLSTORAGE, /* polling storage for subscription */ NT_PUBSUB_KEEPDUPLICATES, /* preserve duplicate values */ NT_PUBSUB_LOCALREMOTE, /* local, remote, or any value changes */ + NT_PUBSUB_EXCLUDEPUB, /* exclude value changes made by given publisher */ + NT_PUBSUB_EXCLUDESELF, /* exclude value changes made by entry publisher */ }; /** Event notification flags. */ @@ -290,7 +292,7 @@ struct NT_PubSubOption { * Option value. 1 (true) or 0 (false) for immediate and logging options, * time between updates, in milliseconds, for periodic option. For * local/remote option, 1=local only, 2=remote only, 0 or 3=both local and - * remote. + * remote. For exclude publisher, publisher handle. */ unsigned int value; }; diff --git a/ntcore/src/main/native/include/ntcore_cpp.h b/ntcore/src/main/native/include/ntcore_cpp.h index bcdf168135..ef65c84909 100644 --- a/ntcore/src/main/native/include/ntcore_cpp.h +++ b/ntcore/src/main/native/include/ntcore_cpp.h @@ -358,6 +358,28 @@ class PubSubOption { return PubSubOption{NT_PUBSUB_LOCALREMOTE, 0u}; } + /** + * Don't queue value updates for the given publisher. Only has an effect on + * subscriptions. Only one exclusion may be set. + * + * @param publisher publisher handle + * @return option + */ + static constexpr PubSubOption ExcludePublisher(NT_Publisher publisher) { + return PubSubOption{NT_PUBSUB_EXCLUDEPUB, publisher}; + } + + /** + * Don't queue value updates for the internal publisher for an entry. Only has + * an effect on entries. + * + * @param enabled True to enable, false to disable + * @return option + */ + static constexpr PubSubOption ExcludeSelf(bool enabled) { + return PubSubOption{NT_PUBSUB_EXCLUDESELF, enabled ? 1u : 0u}; + } + NT_PubSubOptionType type; unsigned int value; }; diff --git a/ntcore/src/test/native/cpp/LocalStorageTest.cpp b/ntcore/src/test/native/cpp/LocalStorageTest.cpp index cc30b0b69d..83155cda96 100644 --- a/ntcore/src/test/native/cpp/LocalStorageTest.cpp +++ b/ntcore/src/test/native/cpp/LocalStorageTest.cpp @@ -867,4 +867,50 @@ TEST_F(LocalStorageTest, ReadQueueLocalRemote) { EXPECT_THAT(storage.ReadQueueDouble(subLocal), IsEmpty()); } +TEST_F(LocalStorageTest, SubExcludePub) { + EXPECT_CALL(network, Subscribe(_, _, _)).Times(2); + EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1); + + auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {}); + auto subActive = storage.Subscribe(fooTopic, NT_DOUBLE, "double", {}); + auto subExclude = storage.Subscribe(fooTopic, NT_DOUBLE, "double", + {{PubSubOption::ExcludePublisher(pub)}}); + auto remoteTopic = + storage.NetworkAnnounce("foo", "double", wpi::json::object(), 0); + + // local set + EXPECT_CALL(network, SetValue(_, _)); + storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50)); + EXPECT_THAT(storage.ReadQueueDouble(subActive), + ElementsAre(TSEq(1.0, 50))); + EXPECT_THAT(storage.ReadQueueDouble(subExclude), IsEmpty()); + + // network set + storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); + EXPECT_THAT(storage.ReadQueueDouble(subActive), + ElementsAre(TSEq(2.0, 60))); + EXPECT_THAT(storage.ReadQueueDouble(subExclude), + ElementsAre(TSEq(2.0, 60))); +} + +TEST_F(LocalStorageTest, EntryExcludeSelf) { + EXPECT_CALL(network, Subscribe(_, _, _)); + + auto entry = storage.GetEntry(fooTopic, NT_DOUBLE, "double", + {{PubSubOption::ExcludeSelf(true)}}); + auto remoteTopic = + storage.NetworkAnnounce("foo", "double", wpi::json::object(), 0); + + // local set + EXPECT_CALL(network, Publish(_, _, _, _, _, _)); + EXPECT_CALL(network, SetValue(_, _)); + storage.SetEntryValue(entry, Value::MakeDouble(1.0, 50)); + EXPECT_THAT(storage.ReadQueueDouble(entry), IsEmpty()); + + // network set + storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 60)); + EXPECT_THAT(storage.ReadQueueDouble(entry), + ElementsAre(TSEq(2.0, 60))); +} + } // namespace nt