[ntcore] Implement keep duplicates pub/sub flag (#4666)

Also don't save duplicate NT sets to data log (unless publish keep duplicates flag is set).
This commit is contained in:
Peter Johnson
2022-11-18 23:50:08 -08:00
committed by GitHub
parent e962fd2916
commit 5ec067c1f8
2 changed files with 140 additions and 24 deletions

View File

@@ -260,8 +260,9 @@ struct LSImpl {
void CheckReset(TopicData* topic);
bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags);
void NotifyValue(TopicData* topic, unsigned int eventFlags);
bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags,
bool isDuplicate);
void NotifyValue(TopicData* topic, unsigned int eventFlags, bool isDuplicate);
void SetFlags(TopicData* topic, unsigned int flags);
void SetPersistent(TopicData* topic, bool value);
@@ -323,7 +324,8 @@ struct LSImpl {
PublisherData* PublishEntry(EntryData* entry, NT_Type type);
Value* GetSubEntryValue(NT_Handle subentryHandle);
bool PublishLocalValue(PublisherData* publisher, const Value& value);
bool PublishLocalValue(PublisherData* publisher, const Value& value,
bool force = false);
bool SetEntryValue(NT_Handle pubentryHandle, const Value& value);
bool SetDefaultEntryValue(NT_Handle pubsubentryHandle, const Value& value);
@@ -468,7 +470,7 @@ void LSImpl::CheckReset(TopicData* topic) {
}
bool LSImpl::SetValue(TopicData* topic, const Value& value,
unsigned int eventFlags) {
unsigned int eventFlags, bool isDuplicate) {
if (topic->type != NT_UNASSIGNED && topic->type != value.type()) {
return false;
}
@@ -479,9 +481,9 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value,
topic->type = value.type();
topic->lastValue = value;
topic->lastValueNetwork = isNetwork;
NotifyValue(topic, eventFlags);
NotifyValue(topic, eventFlags, isDuplicate);
}
if (topic->datalogType == value.type()) {
if (!isDuplicate && topic->datalogType == value.type()) {
for (auto&& datalog : topic->datalogs) {
datalog.Append(value);
}
@@ -489,9 +491,11 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value,
return true;
}
void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags) {
void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags,
bool isDuplicate) {
for (auto&& subscriber : topic->localSubscribers) {
if (subscriber->active) {
if (subscriber->active &&
(subscriber->config.keepDuplicates || !isDuplicate)) {
subscriber->pollStorage.emplace_back(topic->lastValue);
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {
@@ -502,10 +506,12 @@ void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags) {
}
for (auto&& subscriber : topic->multiSubscribers) {
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
if (subscriber->options.keepDuplicates || !isDuplicate) {
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
}
}
}
}
@@ -1198,7 +1204,8 @@ Value* LSImpl::GetSubEntryValue(NT_Handle subentryHandle) {
}
}
bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value) {
bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value,
bool force) {
if (!value) {
return false;
}
@@ -1211,10 +1218,16 @@ bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value) {
return false;
}
if (publisher->active) {
if (m_network) {
bool isDuplicate;
if (force || publisher->config.keepDuplicates) {
isDuplicate = false;
} else {
isDuplicate = (publisher->topic->lastValue == value);
}
if (!isDuplicate && m_network) {
m_network->SetValue(publisher->handle, value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL);
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate);
} else {
return false;
}
@@ -1269,7 +1282,7 @@ bool LSImpl::SetDefaultEntryValue(NT_Handle pubsubentryHandle,
topic->lastValue.SetTime(0);
topic->lastValue.SetServerTime(0);
if (publisher) {
PublishLocalValue(publisher, topic->lastValue);
PublishLocalValue(publisher, topic->lastValue, true);
}
return true;
}
@@ -1333,7 +1346,8 @@ void LocalStorage::NetworkPropertiesUpdate(std::string_view name,
void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl->m_topics.Get(topicHandle)) {
m_impl->SetValue(topic, value, NT_EVENT_VALUE_REMOTE);
m_impl->SetValue(topic, value, NT_EVENT_VALUE_REMOTE,
value == topic->lastValue);
}
}