[ntcore] Add subscriber option to exclude single publisher

This enables limited self-exclusion for entries seeing their own changes
or similarly with a pub/sub pair.
This commit is contained in:
Peter Johnson
2022-12-05 23:00:50 -08:00
parent b0e4053087
commit 342c375a71
7 changed files with 131 additions and 9 deletions

View File

@@ -282,8 +282,9 @@ struct LSImpl {
void CheckReset(TopicData* topic);
bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags,
bool isDuplicate);
void NotifyValue(TopicData* topic, unsigned int eventFlags, bool isDuplicate);
bool isDuplicate, const PublisherData* publisher);
void NotifyValue(TopicData* topic, unsigned int eventFlags, bool isDuplicate,
const PublisherData* publisher);
void SetFlags(TopicData* topic, unsigned int flags);
void SetPersistent(TopicData* topic, bool value);
@@ -489,7 +490,8 @@ void LSImpl::CheckReset(TopicData* topic) {
}
bool LSImpl::SetValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate) {
unsigned int eventFlags, bool isDuplicate,
const PublisherData* publisher) {
DEBUG4("SetValue({}, {}, {}, {})", topic->name, value.time(), eventFlags,
isDuplicate);
if (topic->type != NT_UNASSIGNED && topic->type != value.type()) {
@@ -499,7 +501,7 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value,
// TODO: notify option even if older value
topic->type = value.type();
topic->lastValue = value;
NotifyValue(topic, eventFlags, isDuplicate);
NotifyValue(topic, eventFlags, isDuplicate, publisher);
}
if (!isDuplicate && topic->datalogType == value.type()) {
for (auto&& datalog : topic->datalogs) {
@@ -510,13 +512,15 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value,
}
void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags,
bool isDuplicate) {
bool isDuplicate, const PublisherData* publisher) {
bool isNetwork = (eventFlags & NT_EVENT_VALUE_REMOTE) != 0;
for (auto&& subscriber : topic->localSubscribers) {
if (subscriber->active &&
(subscriber->config.keepDuplicates || !isDuplicate) &&
((isNetwork && subscriber->config.fromRemote) ||
(!isNetwork && subscriber->config.fromLocal))) {
(!isNetwork && subscriber->config.fromLocal)) &&
(!publisher ||
(publisher && (subscriber->config.excludePub != publisher->handle)))) {
subscriber->pollStorage.emplace_back(topic->lastValue);
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {
@@ -1246,7 +1250,8 @@ bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value,
publisher->topic->lastValueNetwork = value;
m_network->SetValue(publisher->handle, value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate);
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate,
publisher);
} else {
return false;
}
@@ -1260,6 +1265,9 @@ bool LSImpl::SetEntryValue(NT_Handle pubentryHandle, const Value& value) {
if (!publisher) {
if (auto entry = m_entries.Get(pubentryHandle)) {
publisher = PublishEntry(entry, value.type());
if (entry->subscriber->config.excludeSelf) {
entry->subscriber->config.excludePub = publisher->handle;
}
}
if (!publisher) {
return false;
@@ -1366,7 +1374,7 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl->m_topics.Get(topicHandle)) {
if (m_impl->SetValue(topic, value, NT_EVENT_VALUE_REMOTE,
value == topic->lastValue)) {
value == topic->lastValue, nullptr)) {
topic->lastValueNetwork = value;
}
}

View File

@@ -4,6 +4,7 @@
#include "PubSubOptions.h"
#include "ntcore_c.h"
#include "ntcore_cpp.h"
using namespace nt;
@@ -48,6 +49,12 @@ nt::PubSubOptions::PubSubOptions(std::span<const PubSubOption> options) {
break;
}
break;
case NT_PUBSUB_EXCLUDEPUB:
excludePub = option.value;
break;
case NT_PUBSUB_EXCLUDESELF:
excludeSelf = option.value != 0;
break;
default:
break;
}

View File

@@ -26,6 +26,8 @@ class PubSubOptions {
bool keepDuplicates = false;
bool fromRemote = true;
bool fromLocal = true;
unsigned int excludePub = 0;
bool excludeSelf = false;
};
} // namespace nt