mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-21 01:01:43 +00:00
[ntcore] Fix special topic multi-subscriber handling (#4740)
It now matches server behavior, where "" doesn't match special topics. Also fixes duplicate notification that could occur in some cases.
This commit is contained in:
@@ -34,6 +34,15 @@ static constexpr size_t kMaxListeners = 512;
|
||||
|
||||
namespace {
|
||||
|
||||
static constexpr bool IsSpecial(std::string_view name) {
|
||||
return name.empty() ? false : name.front() == '$';
|
||||
}
|
||||
|
||||
static constexpr bool PrefixMatch(std::string_view name,
|
||||
std::string_view prefix, bool special) {
|
||||
return (!special || !prefix.empty()) && wpi::starts_with(name, prefix);
|
||||
}
|
||||
|
||||
// Utility wrapper for making a set-like vector
|
||||
template <typename T>
|
||||
class VectorSet : public std::vector<T> {
|
||||
@@ -66,7 +75,7 @@ struct TopicData {
|
||||
static constexpr auto kType = Handle::kTopic;
|
||||
|
||||
TopicData(NT_Topic handle, std::string_view name)
|
||||
: handle{handle}, name{name} {}
|
||||
: handle{handle}, name{name}, special{IsSpecial(name)} {}
|
||||
|
||||
bool Exists() const { return onNetwork || !localPublishers.empty(); }
|
||||
|
||||
@@ -75,6 +84,7 @@ struct TopicData {
|
||||
// invariants
|
||||
wpi::SignalObject<NT_Topic> handle;
|
||||
std::string name;
|
||||
bool special;
|
||||
|
||||
Value lastValue; // also stores timestamp
|
||||
Value lastValueNetwork;
|
||||
@@ -179,6 +189,8 @@ struct MultiSubscriberData {
|
||||
}
|
||||
}
|
||||
|
||||
bool Matches(std::string_view name, bool special);
|
||||
|
||||
// invariants
|
||||
wpi::SignalObject<NT_MultiSubscriber> handle;
|
||||
std::vector<std::string> prefixes;
|
||||
@@ -188,6 +200,15 @@ struct MultiSubscriberData {
|
||||
VectorSet<NT_Listener> valueListeners;
|
||||
};
|
||||
|
||||
bool MultiSubscriberData::Matches(std::string_view name, bool special) {
|
||||
for (auto&& prefix : prefixes) {
|
||||
if (PrefixMatch(name, prefix, special)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
struct ListenerData {
|
||||
ListenerData(NT_Listener handle, SubscriberData* subscriber,
|
||||
unsigned int eventMask, bool subscriberOwned)
|
||||
@@ -403,6 +424,7 @@ void SubscriberData::UpdateActive() {
|
||||
}
|
||||
|
||||
void LSImpl::NotifyTopic(TopicData* topic, unsigned int eventFlags) {
|
||||
DEBUG4("NotifyTopic({}, {})\n", topic->name, eventFlags);
|
||||
auto topicInfo = topic->GetTopicInfo();
|
||||
if (!topic->listeners.empty()) {
|
||||
m_listenerStorage.Notify(topic->listeners, eventFlags, topicInfo);
|
||||
@@ -410,12 +432,9 @@ void LSImpl::NotifyTopic(TopicData* topic, unsigned int eventFlags) {
|
||||
|
||||
wpi::SmallVector<NT_Listener, 32> listeners;
|
||||
for (auto listener : m_topicPrefixListeners) {
|
||||
if (listener->multiSubscriber) {
|
||||
for (auto&& prefix : listener->multiSubscriber->prefixes) {
|
||||
if (wpi::starts_with(topic->name, prefix)) {
|
||||
listeners.emplace_back(listener->handle);
|
||||
}
|
||||
}
|
||||
if (listener->multiSubscriber &&
|
||||
listener->multiSubscriber->Matches(topic->name, topic->special)) {
|
||||
listeners.emplace_back(listener->handle);
|
||||
}
|
||||
}
|
||||
if (!listeners.empty()) {
|
||||
@@ -874,7 +893,7 @@ MultiSubscriberData* LSImpl::AddMultiSubscriber(
|
||||
// subscribe to any already existing topics
|
||||
for (auto&& topic : m_topics) {
|
||||
for (auto&& prefix : prefixes) {
|
||||
if (wpi::starts_with(topic->name, prefix)) {
|
||||
if (PrefixMatch(topic->name, prefix, topic->special)) {
|
||||
topic->multiSubscribers.Add(subscriber);
|
||||
break;
|
||||
}
|
||||
@@ -995,10 +1014,8 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
|
||||
if ((eventMask & NT_EVENT_IMMEDIATE) != 0 &&
|
||||
(eventMask & (NT_EVENT_PUBLISH | NT_EVENT_VALUE_ALL)) != 0) {
|
||||
for (auto&& topic : m_topics) {
|
||||
for (auto&& prefix : subscriber->prefixes) {
|
||||
if (wpi::starts_with(topic->name, prefix) && topic->Exists()) {
|
||||
topics.emplace_back(topic.get());
|
||||
}
|
||||
if (topic->Exists() && subscriber->Matches(topic->name, topic->special)) {
|
||||
topics.emplace_back(topic.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1124,11 +1141,8 @@ TopicData* LSImpl::GetOrCreateTopic(std::string_view name) {
|
||||
topic = m_topics.Add(m_inst, name);
|
||||
// attach multi-subscribers
|
||||
for (auto&& sub : m_multiSubscribers) {
|
||||
for (auto&& prefix : sub->prefixes) {
|
||||
if (wpi::starts_with(name, prefix)) {
|
||||
topic->multiSubscribers.Add(sub.get());
|
||||
break;
|
||||
}
|
||||
if (sub->Matches(name, topic->special)) {
|
||||
topic->multiSubscribers.Add(sub.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user