[ntcore] Various fixes and cleanups (#4544)

* NetworkTableInstance: set handle to 0 after destroy
* Fix multiple notifications of local values
* Detect mismatch between handles
* Server: fix setting min period when no topics
* Limit maximum number of subscribers/publishers/listeners
   This helps find resource leaks and prevents them from causing excessive
   slowdowns/crashes.  The limit on each is currently set to 512.
* Don't use std::swap in move operation
This commit is contained in:
Peter Johnson
2022-11-04 20:01:21 -07:00
committed by GitHub
parent 837415abfd
commit 4ba16db645
9 changed files with 215 additions and 42 deletions

View File

@@ -25,6 +25,12 @@
using namespace nt;
// maximum number of local publishers / subscribers to any given topic
static constexpr size_t kMaxPublishers = 512;
static constexpr size_t kMaxSubscribers = 512;
static constexpr size_t kMaxMultiSubscribers = 512;
static constexpr size_t kMaxListeners = 512;
namespace {
// Utility wrapper for making a set-like vector
@@ -495,15 +501,19 @@ void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags) {
if (subscriber->active) {
subscriber->pollStorage.emplace_back(topic->lastValue);
subscriber->handle.Set();
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
}
}
}
for (auto&& subscriber : topic->multiSubscribers) {
subscriber->handle.Set();
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
}
}
}
@@ -889,6 +899,12 @@ std::unique_ptr<MultiSubscriberData> LSImpl::RemoveMultiSubscriber(
void LSImpl::AddListenerImpl(NT_Listener listenerHandle, TopicData* topic,
unsigned int eventMask) {
if (topic->localSubscribers.size() >= kMaxSubscribers) {
ERROR(
"reached maximum number of subscribers to '{}', ignoring listener add",
topic->name);
return;
}
// subscribe to make sure topic updates are received
PubSubConfig config;
config.topicsOnly = (eventMask & NT_EVENT_VALUE_ALL) == 0;
@@ -906,6 +922,12 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
auto topic = subscriber->topic;
if ((eventMask & NT_EVENT_TOPIC) != 0) {
if (topic->listeners.size() >= kMaxListeners) {
ERROR("reached maximum number of listeners to '{}', not adding listener",
topic->name);
return;
}
m_listenerStorage.Activate(
listenerHandle, eventMask & (NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE));
@@ -922,6 +944,11 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
}
if ((eventMask & NT_EVENT_VALUE_ALL) != 0) {
if (subscriber->valueListeners.size() >= kMaxListeners) {
ERROR("reached maximum number of listeners to '{}', not adding listener",
topic->name);
return;
}
m_listenerStorage.Activate(
listenerHandle, eventMask & (NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE),
[subentryHandle](unsigned int mask, Event* event) {
@@ -968,6 +995,11 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
}
if ((eventMask & NT_EVENT_TOPIC) != 0) {
if (m_topicPrefixListeners.size() >= kMaxListeners) {
ERROR("reached maximum number of listeners, not adding listener");
return;
}
m_listenerStorage.Activate(
listenerHandle, eventMask & (NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE));
@@ -989,6 +1021,11 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
}
if ((eventMask & NT_EVENT_VALUE_ALL) != 0) {
if (subscriber->valueListeners.size() >= kMaxListeners) {
ERROR("reached maximum number of listeners, not adding listener");
return;
}
m_listenerStorage.Activate(
listenerHandle, eventMask & (NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE),
[subentryHandle = subscriber->handle.GetHandle()](unsigned int mask,
@@ -1018,6 +1055,10 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
void LSImpl::AddListener(NT_Listener listenerHandle,
std::span<const std::string_view> prefixes,
unsigned int eventMask) {
if (m_multiSubscribers.size() >= kMaxMultiSubscribers) {
ERROR("reached maximum number of multi-subscribers, not adding listener");
return;
}
// subscribe to make sure topic updates are received
PubSubOptions options;
options.topicsOnly = (eventMask & NT_EVENT_VALUE_ALL) == 0;
@@ -1548,6 +1589,13 @@ NT_Subscriber LocalStorage::Subscribe(NT_Topic topicHandle, NT_Type type,
return 0;
}
if (topic->localSubscribers.size() >= kMaxSubscribers) {
WPI_ERROR(m_impl->m_logger,
"reached maximum number of subscribers to '{}', not subscribing",
topic->name);
return 0;
}
// Create subscriber
return m_impl->AddLocalSubscriber(topic, PubSubConfig{type, typeStr, options})
->handle;
@@ -1562,6 +1610,13 @@ NT_MultiSubscriber LocalStorage::SubscribeMultiple(
std::span<const std::string_view> prefixes,
std::span<const PubSubOption> options) {
std::scoped_lock lock{m_mutex};
if (m_impl->m_multiSubscribers.size() >= kMaxMultiSubscribers) {
WPI_ERROR(m_impl->m_logger,
"reached maximum number of multi-subscribers, not subscribing");
return 0;
}
PubSubOptions opts{options};
opts.prefixMatch = true;
return m_impl->AddMultiSubscriber(prefixes, opts)->handle;
@@ -1594,6 +1649,13 @@ NT_Publisher LocalStorage::Publish(NT_Topic topicHandle, NT_Type type,
return 0;
}
if (topic->localPublishers.size() >= kMaxPublishers) {
WPI_ERROR(m_impl->m_logger,
"reached maximum number of publishers to '{}', not publishing",
topic->name);
return 0;
}
return m_impl
->AddLocalPublisher(topic, properties,
PubSubConfig{type, typeStr, options})
@@ -1627,6 +1689,14 @@ NT_Entry LocalStorage::GetEntry(NT_Topic topicHandle, NT_Type type,
return 0;
}
if (topic->localSubscribers.size() >= kMaxSubscribers) {
WPI_ERROR(
m_impl->m_logger,
"reached maximum number of subscribers to '{}', not creating entry",
topic->name);
return 0;
}
// Create subscriber
auto subscriber =
m_impl->AddLocalSubscriber(topic, PubSubConfig{type, typeStr, options});
@@ -2010,6 +2080,14 @@ NT_Entry LocalStorage::GetEntry(std::string_view name) {
auto* topic = m_impl->GetOrCreateTopic(name);
if (topic->entry == 0) {
if (topic->localSubscribers.size() >= kMaxSubscribers) {
WPI_ERROR(
m_impl->m_logger,
"reached maximum number of subscribers to '{}', not creating entry",
topic->name);
return 0;
}
// Create subscriber
auto* subscriber = m_impl->AddLocalSubscriber(topic, {});

View File

@@ -620,8 +620,20 @@ void ClientData4Base::ClientSubscribe(int64_t subuid,
sub->periodMs = kMinPeriodMs;
}
// update periodic sender (if not local)
if (!m_local) {
if (m_periodMs == UINT32_MAX) {
m_periodMs = sub->periodMs;
} else {
m_periodMs = std::gcd(m_periodMs, sub->periodMs);
}
if (m_periodMs < kMinPeriodMs) {
m_periodMs = kMinPeriodMs;
}
m_setPeriodic(m_periodMs);
}
// see if this immediately subscribes to any topics
bool updatedPeriodic = false;
for (auto&& topic : m_server.m_topics) {
bool removed = false;
if (replace) {
@@ -647,14 +659,6 @@ void ClientData4Base::ClientSubscribe(int64_t subuid,
m_server.UpdateMetaTopicSub(topic.get());
}
if (added || removed) {
// update periodic sender (if not local)
if (!m_local) {
m_periodMs = std::gcd(m_periodMs, sub->periodMs);
updatedPeriodic = true;
}
}
if (!wasSubscribed && added && !removed) {
// announce topic to client
DEBUG4("client {}: announce {}", m_id, topic->name);
@@ -667,12 +671,6 @@ void ClientData4Base::ClientSubscribe(int64_t subuid,
}
}
}
if (updatedPeriodic) {
if (m_periodMs < kMinPeriodMs) {
m_periodMs = kMinPeriodMs;
}
m_setPeriodic(m_periodMs);
}
// update meta data
UpdateMetaClientSub();

View File

@@ -101,9 +101,44 @@ void NetworkTableInstance::SetServer(std::span<const std::string_view> servers,
SetServer(serversArr);
}
NT_Listener NetworkTableInstance::AddListener(MultiSubscriber& subscriber,
int eventMask,
NT_Listener NetworkTableInstance::AddListener(Topic topic,
unsigned int eventMask,
ListenerCallback listener) {
if (::nt::GetInstanceFromHandle(topic.GetHandle()) != m_handle) {
fmt::print(stderr, "AddListener: topic is not from this instance\n");
return 0;
}
return ::nt::AddListener(topic.GetHandle(), eventMask, std::move(listener));
}
NT_Listener NetworkTableInstance::AddListener(Subscriber& subscriber,
unsigned int eventMask,
ListenerCallback listener) {
if (::nt::GetInstanceFromHandle(subscriber.GetHandle()) != m_handle) {
fmt::print(stderr, "AddListener: subscriber is not from this instance\n");
return 0;
}
return ::nt::AddListener(subscriber.GetHandle(), eventMask,
std::move(listener));
}
NT_Listener NetworkTableInstance::AddListener(NetworkTableEntry& entry,
int eventMask,
ListenerCallback listener) {
if (::nt::GetInstanceFromHandle(entry.GetHandle()) != m_handle) {
fmt::print(stderr, "AddListener: entry is not from this instance\n");
return 0;
}
return ::nt::AddListener(entry.GetHandle(), eventMask, std::move(listener));
}
NT_Listener NetworkTableInstance::AddListener(MultiSubscriber& subscriber,
int eventMask,
ListenerCallback listener) {
if (::nt::GetInstanceFromHandle(subscriber.GetHandle()) != m_handle) {
fmt::print(stderr, "AddListener: subscriber is not from this instance\n");
return 0;
}
return ::nt::AddListener(subscriber.GetHandle(), eventMask,
std::move(listener));
}

View File

@@ -497,6 +497,14 @@ NT_Listener AddPolledListener(NT_ListenerPoller poller,
NT_Listener AddPolledListener(NT_ListenerPoller poller, NT_Handle handle,
unsigned int mask) {
if (auto ii = InstanceImpl::GetTyped(poller, Handle::kListenerPoller)) {
if (Handle{handle}.GetInst() != Handle{poller}.GetInst()) {
WPI_ERROR(
ii->logger,
"AddPolledListener(): trying to listen to handle {} (instance {}) "
"with poller {} (instance {}), ignored due to different instance",
handle, Handle{handle}.GetInst(), poller, Handle{poller}.GetInst());
return {};
}
auto listener = ii->listenerStorage.AddListener(poller);
DoAddListener(*ii, listener, handle, mask);
return listener;