From 4a43ddbacf053c0939bad985ad5fa377488f3704 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Fri, 18 Oct 2024 23:08:52 -0700 Subject: [PATCH] [ntcore] Split LocalStorage implementation into separate files --- ntcore/CMakeLists.txt | 1 + ntcore/src/main/native/cpp/LocalStorage.cpp | 1540 +---------------- ntcore/src/main/native/cpp/LocalStorage.h | 556 ++---- .../main/native/cpp/local/LocalDataLogger.cpp | 24 + .../main/native/cpp/local/LocalDataLogger.h | 39 + .../native/cpp/local/LocalDataLoggerEntry.cpp | 64 + .../native/cpp/local/LocalDataLoggerEntry.h | 45 + ntcore/src/main/native/cpp/local/LocalEntry.h | 31 + .../src/main/native/cpp/local/LocalListener.h | 35 + .../native/cpp/local/LocalMultiSubscriber.h | 59 + .../main/native/cpp/local/LocalPublisher.h | 36 + .../native/cpp/local/LocalStorageImpl.cpp | 1100 ++++++++++++ .../main/native/cpp/local/LocalStorageImpl.h | 320 ++++ .../main/native/cpp/local/LocalSubscriber.h | 53 + .../src/main/native/cpp/local/LocalTopic.cpp | 226 +++ ntcore/src/main/native/cpp/local/LocalTopic.h | 111 ++ .../src/main/native/cpp/local/PubSubConfig.h | 27 + 17 files changed, 2360 insertions(+), 1907 deletions(-) create mode 100644 ntcore/src/main/native/cpp/local/LocalDataLogger.cpp create mode 100644 ntcore/src/main/native/cpp/local/LocalDataLogger.h create mode 100644 ntcore/src/main/native/cpp/local/LocalDataLoggerEntry.cpp create mode 100644 ntcore/src/main/native/cpp/local/LocalDataLoggerEntry.h create mode 100644 ntcore/src/main/native/cpp/local/LocalEntry.h create mode 100644 ntcore/src/main/native/cpp/local/LocalListener.h create mode 100644 ntcore/src/main/native/cpp/local/LocalMultiSubscriber.h create mode 100644 ntcore/src/main/native/cpp/local/LocalPublisher.h create mode 100644 ntcore/src/main/native/cpp/local/LocalStorageImpl.cpp create mode 100644 ntcore/src/main/native/cpp/local/LocalStorageImpl.h create mode 100644 ntcore/src/main/native/cpp/local/LocalSubscriber.h create mode 100644 ntcore/src/main/native/cpp/local/LocalTopic.cpp create mode 100644 ntcore/src/main/native/cpp/local/LocalTopic.h create mode 100644 ntcore/src/main/native/cpp/local/PubSubConfig.h diff --git a/ntcore/CMakeLists.txt b/ntcore/CMakeLists.txt index c8fdca2262..f108a9133d 100644 --- a/ntcore/CMakeLists.txt +++ b/ntcore/CMakeLists.txt @@ -7,6 +7,7 @@ file( GLOB ntcore_native_src src/main/native/cpp/*.cpp src/generated/main/native/cpp/*.cpp + src/main/native/cpp/local/*.cpp src/main/native/cpp/net/*.cpp src/main/native/cpp/net3/*.cpp src/main/native/cpp/networktables/*.cpp diff --git a/ntcore/src/main/native/cpp/LocalStorage.cpp b/ntcore/src/main/native/cpp/LocalStorage.cpp index 494e145309..c8e35a229c 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.cpp +++ b/ntcore/src/main/native/cpp/LocalStorage.cpp @@ -4,1214 +4,16 @@ #include "LocalStorage.h" -#include -#include -#include -#include #include -#include -#include -#include -#include -#include - -#include "IListenerStorage.h" -#include "Log.h" -#include "Types_internal.h" -#include "Value_internal.h" -#include "net/MessageHandler.h" -#include "networktables/NetworkTableValue.h" - using namespace nt; -// maximum number of local publishers / subscribers to any given topic -static constexpr size_t kMaxPublishers = 512; -static constexpr size_t kMaxSubscribers = 512; -static constexpr size_t kMaxMultiSubscribers = 512; -static constexpr size_t kMaxListeners = 512; - -static constexpr bool PrefixMatch(std::string_view name, - std::string_view prefix, bool special) { - return (!special || !prefix.empty()) && wpi::starts_with(name, prefix); -} - -std::string LocalStorage::DataLoggerEntry::MakeMetadata( - std::string_view properties) { - return fmt::format("{{\"properties\":{},\"source\":\"NT\"}}", properties); -} - -bool LocalStorage::MultiSubscriberData::Matches(std::string_view name, - bool special) { - for (auto&& prefix : prefixes) { - if (PrefixMatch(name, prefix, special)) { - return true; - } - } - return false; -} - -int LocalStorage::DataLoggerData::Start(TopicData* topic, int64_t time) { - std::string_view typeStr = topic->typeStr; - // NT and DataLog use different standard representations for int and int[] - if (typeStr == "int") { - typeStr = "int64"; - } else if (typeStr == "int[]") { - typeStr = "int64[]"; - } - return log.Start( - fmt::format( - "{}{}", logPrefix, - wpi::remove_prefix(topic->name, prefix).value_or(topic->name)), - typeStr, DataLoggerEntry::MakeMetadata(topic->propertiesStr), time); -} - -void LocalStorage::DataLoggerEntry::Append(const Value& v) { - auto time = v.time(); - switch (v.type()) { - case NT_BOOLEAN: - log->AppendBoolean(entry, v.GetBoolean(), time); - break; - case NT_INTEGER: - log->AppendInteger(entry, v.GetInteger(), time); - break; - case NT_FLOAT: - log->AppendFloat(entry, v.GetFloat(), time); - break; - case NT_DOUBLE: - log->AppendDouble(entry, v.GetDouble(), time); - break; - case NT_STRING: - log->AppendString(entry, v.GetString(), time); - break; - case NT_RAW: { - auto val = v.GetRaw(); - log->AppendRaw(entry, - {reinterpret_cast(val.data()), val.size()}, - time); - break; - } - case NT_BOOLEAN_ARRAY: - log->AppendBooleanArray(entry, v.GetBooleanArray(), time); - break; - case NT_INTEGER_ARRAY: - log->AppendIntegerArray(entry, v.GetIntegerArray(), time); - break; - case NT_FLOAT_ARRAY: - log->AppendFloatArray(entry, v.GetFloatArray(), time); - break; - case NT_DOUBLE_ARRAY: - log->AppendDoubleArray(entry, v.GetDoubleArray(), time); - break; - case NT_STRING_ARRAY: - log->AppendStringArray(entry, v.GetStringArray(), time); - break; - default: - break; - } -} - -TopicInfo LocalStorage::TopicData::GetTopicInfo() const { - TopicInfo info; - info.topic = handle; - info.name = name; - info.type = type; - info.type_str = typeStr; - info.properties = propertiesStr; - return info; -} - -void LocalStorage::Impl::NotifyTopic(TopicData* topic, - unsigned int eventFlags) { - DEBUG4("NotifyTopic({}, {})", topic->name, eventFlags); - auto topicInfo = topic->GetTopicInfo(); - if (!topic->listeners.empty()) { - m_listenerStorage.Notify(topic->listeners, eventFlags, topicInfo); - } - - wpi::SmallVector listeners; - for (auto listener : m_topicPrefixListeners) { - if (listener->multiSubscriber && - listener->multiSubscriber->Matches(topic->name, topic->special)) { - listeners.emplace_back(listener->handle); - } - } - if (!listeners.empty()) { - m_listenerStorage.Notify(listeners, eventFlags, topicInfo); - } - - if ((eventFlags & (NT_EVENT_PUBLISH | NT_EVENT_UNPUBLISH)) != 0) { - if (!m_dataloggers.empty()) { - auto now = Now(); - for (auto&& datalogger : m_dataloggers) { - if (PrefixMatch(topic->name, datalogger->prefix, topic->special)) { - auto it = std::find_if(topic->datalogs.begin(), topic->datalogs.end(), - [&](const auto& elem) { - return elem.logger == datalogger->handle; - }); - if ((eventFlags & NT_EVENT_PUBLISH) != 0 && - it == topic->datalogs.end()) { - topic->datalogs.emplace_back(datalogger->log, - datalogger->Start(topic, now), - datalogger->handle); - topic->datalogType = topic->type; - } else if ((eventFlags & NT_EVENT_UNPUBLISH) != 0 && - it != topic->datalogs.end()) { - it->log->Finish(it->entry, now); - topic->datalogType = NT_UNASSIGNED; - topic->datalogs.erase(it); - } - } - } - } - } else if ((eventFlags & NT_EVENT_PROPERTIES) != 0) { - if (!topic->datalogs.empty()) { - auto metadata = DataLoggerEntry::MakeMetadata(topic->propertiesStr); - for (auto&& datalog : topic->datalogs) { - datalog.log->SetMetadata(datalog.entry, metadata); - } - } - } -} - -void LocalStorage::Impl::CheckReset(TopicData* topic) { - if (topic->Exists()) { - return; - } - topic->lastValue = {}; - topic->lastValueNetwork = {}; - topic->lastValueFromNetwork = false; - topic->type = NT_UNASSIGNED; - topic->typeStr.clear(); - topic->flags = 0; - topic->properties = wpi::json::object(); - topic->propertiesStr = "{}"; -} - -bool LocalStorage::Impl::SetValue(TopicData* topic, const Value& value, - unsigned int eventFlags, - bool suppressIfDuplicate, - const PublisherData* publisher) { - const bool isDuplicate = topic->IsCached() && topic->lastValue == value; - DEBUG4("SetValue({}, {}, {}, {})", topic->name, value.time(), eventFlags, - isDuplicate); - if (topic->type != NT_UNASSIGNED && topic->type != value.type()) { - return false; - } - // Make sure value isn't older than last value - if (!topic->lastValue || topic->lastValue.time() == 0 || - value.time() >= topic->lastValue.time()) { - // TODO: notify option even if older value - if (!(suppressIfDuplicate && isDuplicate)) { - topic->type = value.type(); - if (topic->IsCached()) { - topic->lastValue = value; - topic->lastValueFromNetwork = false; - } - NotifyValue(topic, value, eventFlags, isDuplicate, publisher); - if (topic->datalogType == value.type()) { - for (auto&& datalog : topic->datalogs) { - datalog.Append(value); - } - } - } - } - - return true; -} - -void LocalStorage::Impl::NotifyValue(TopicData* topic, const Value& value, - unsigned int eventFlags, bool isDuplicate, - const PublisherData* publisher) { - bool isNetwork = (eventFlags & NT_EVENT_VALUE_REMOTE) != 0; - for (auto&& subscriber : topic->localSubscribers) { - if (subscriber->active && - (subscriber->config.keepDuplicates || !isDuplicate) && - ((isNetwork && !subscriber->config.disableRemote) || - (!isNetwork && !subscriber->config.disableLocal)) && - (!publisher || (publisher && (subscriber->config.excludePublisher != - publisher->handle)))) { - subscriber->pollStorage.emplace_back(value); - subscriber->handle.Set(); - if (!subscriber->valueListeners.empty()) { - m_listenerStorage.Notify(subscriber->valueListeners, eventFlags, - topic->handle, 0, value); - } - } - } - - for (auto&& subscriber : topic->multiSubscribers) { - if (subscriber->options.keepDuplicates || !isDuplicate) { - subscriber->handle.Set(); - if (!subscriber->valueListeners.empty()) { - m_listenerStorage.Notify(subscriber->valueListeners, eventFlags, - topic->handle, 0, value); - } - } - } -} - -void LocalStorage::Impl::SetFlags(TopicData* topic, unsigned int flags) { - wpi::json update = wpi::json::object(); - if ((flags & NT_PERSISTENT) != 0) { - topic->properties["persistent"] = true; - update["persistent"] = true; - } else { - topic->properties.erase("persistent"); - update["persistent"] = wpi::json(); - } - if ((flags & NT_RETAINED) != 0) { - topic->properties["retained"] = true; - update["retained"] = true; - } else { - topic->properties.erase("retained"); - update["retained"] = wpi::json(); - } - if ((flags & NT_UNCACHED) != 0) { - topic->properties["cached"] = false; - update["cached"] = false; - } else { - topic->properties.erase("cached"); - update["cached"] = wpi::json(); - } - if ((flags & NT_UNCACHED) != 0) { - topic->lastValue = {}; - topic->lastValueNetwork = {}; - topic->lastValueFromNetwork = false; - } - if ((flags & NT_UNCACHED) != 0 && (flags & NT_PERSISTENT) != 0) { - WARN("topic {}: disabling cached property disables persistent storage", - topic->name); - } - topic->flags = flags; - if (!update.empty()) { - PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false); - } -} - -void LocalStorage::Impl::SetPersistent(TopicData* topic, bool value) { - wpi::json update = wpi::json::object(); - if (value) { - topic->flags |= NT_PERSISTENT; - topic->properties["persistent"] = true; - update["persistent"] = true; - } else { - topic->flags &= ~NT_PERSISTENT; - topic->properties.erase("persistent"); - update["persistent"] = wpi::json(); - } - PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false); -} - -void LocalStorage::Impl::SetRetained(TopicData* topic, bool value) { - wpi::json update = wpi::json::object(); - if (value) { - topic->flags |= NT_RETAINED; - topic->properties["retained"] = true; - update["retained"] = true; - } else { - topic->flags &= ~NT_RETAINED; - topic->properties.erase("retained"); - update["retained"] = wpi::json(); - } - PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false); -} - -void LocalStorage::Impl::SetCached(TopicData* topic, bool value) { - wpi::json update = wpi::json::object(); - if (value) { - topic->flags &= ~NT_UNCACHED; - topic->properties.erase("cached"); - update["cached"] = wpi::json(); - } else { - topic->flags |= NT_UNCACHED; - topic->properties["cached"] = false; - update["cached"] = false; - } - PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false); -} - -void LocalStorage::Impl::SetProperties(TopicData* topic, - const wpi::json& update, - bool sendNetwork) { - if (!update.is_object()) { - return; - } - DEBUG4("SetProperties({},{})", topic->name, sendNetwork); - for (auto&& change : update.items()) { - if (change.value().is_null()) { - topic->properties.erase(change.key()); - } else { - topic->properties[change.key()] = change.value(); - } - } - PropertiesUpdated(topic, update, NT_EVENT_NONE, sendNetwork); -} - -void LocalStorage::Impl::PropertiesUpdated(TopicData* topic, - const wpi::json& update, - unsigned int eventFlags, - bool sendNetwork, bool updateFlags) { - DEBUG4("PropertiesUpdated({}, {}, {}, {}, {})", topic->name, update.dump(), - eventFlags, sendNetwork, updateFlags); - if (updateFlags) { - // set flags from properties - auto it = topic->properties.find("persistent"); - if (it != topic->properties.end()) { - if (auto val = it->get_ptr()) { - if (*val) { - topic->flags |= NT_PERSISTENT; - } else { - topic->flags &= ~NT_PERSISTENT; - } - } - } - it = topic->properties.find("retained"); - if (it != topic->properties.end()) { - if (auto val = it->get_ptr()) { - if (*val) { - topic->flags |= NT_RETAINED; - } else { - topic->flags &= ~NT_RETAINED; - } - } - } - it = topic->properties.find("cached"); - if (it != topic->properties.end()) { - if (auto val = it->get_ptr()) { - if (*val) { - topic->flags &= ~NT_UNCACHED; - } else { - topic->flags |= NT_UNCACHED; - } - } - } - - if ((topic->flags & NT_UNCACHED) != 0) { - topic->lastValue = {}; - topic->lastValueNetwork = {}; - topic->lastValueFromNetwork = false; - } - - if ((topic->flags & NT_UNCACHED) != 0 && - (topic->flags & NT_PERSISTENT) != 0) { - WARN("topic {}: disabling cached property disables persistent storage", - topic->name); - } - } - - topic->propertiesStr = topic->properties.dump(); - NotifyTopic(topic, eventFlags | NT_EVENT_PROPERTIES); - // check local flag so we don't echo back received properties changes - if (m_network && sendNetwork) { - m_network->ClientSetProperties(topic->name, update); - } -} - -void LocalStorage::Impl::RefreshPubSubActive(TopicData* topic, - bool warnOnSubMismatch) { - for (auto&& publisher : topic->localPublishers) { - publisher->UpdateActive(); - } - for (auto&& subscriber : topic->localSubscribers) { - subscriber->UpdateActive(); - if (warnOnSubMismatch && topic->Exists() && !subscriber->active) { - // warn on type mismatch - INFO( - "local subscribe to '{}' disabled due to type mismatch (wanted '{}', " - "published as '{}')", - topic->name, subscriber->config.typeStr, topic->typeStr); - } - } -} - -void LocalStorage::Impl::NetworkAnnounce(TopicData* topic, - std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid) { - DEBUG4("LS NetworkAnnounce({}, {}, {}, {})", topic->name, typeStr, - properties.dump(), pubuid.value_or(-1)); - if (pubuid.has_value()) { - return; // ack of our publish; ignore - } - - unsigned int event = NT_EVENT_NONE; - // fresh non-local publish; the network publish always sets the type even - // if it was locally published, but output a diagnostic for this case - bool didExist = topic->Exists(); - topic->onNetwork = true; - NT_Type type = StringToType(typeStr); - if (topic->type != type || topic->typeStr != typeStr) { - if (didExist) { - INFO( - "network announce of '{}' overriding local publish (was '{}', now " - "'{}')", - topic->name, topic->typeStr, typeStr); - } - topic->type = type; - topic->typeStr = typeStr; - RefreshPubSubActive(topic, true); - } - if (!didExist) { - event |= NT_EVENT_PUBLISH; - } - - // may be properties update, but need to compare to see if it actually - // changed to determine whether to update string / send event - wpi::json update = wpi::json::object(); - // added/changed - for (auto&& prop : properties.items()) { - auto it = topic->properties.find(prop.key()); - if (it == topic->properties.end() || *it != prop.value()) { - update[prop.key()] = prop.value(); - } - } - // removed - for (auto&& prop : topic->properties.items()) { - if (properties.find(prop.key()) == properties.end()) { - update[prop.key()] = wpi::json(); - } - } - if (!update.empty()) { - topic->properties = properties; - PropertiesUpdated(topic, update, event, false); - } else if (event != NT_EVENT_NONE) { - NotifyTopic(topic, event); - } -} - -void LocalStorage::Impl::RemoveNetworkPublisher(TopicData* topic) { - DEBUG4("LS RemoveNetworkPublisher({}, {})", topic->handle.GetHandle(), - topic->name); - // this acts as an unpublish - bool didExist = topic->Exists(); - topic->onNetwork = false; - if (didExist && !topic->Exists()) { - DEBUG4("Unpublished {}", topic->name); - CheckReset(topic); - NotifyTopic(topic, NT_EVENT_UNPUBLISH); - } - - if (!topic->localPublishers.empty()) { - // some other publisher still exists; if it has a different type, refresh - // and publish it over the network - auto& nextPub = topic->localPublishers.front(); - if (nextPub->config.type != topic->type || - nextPub->config.typeStr != topic->typeStr) { - topic->type = nextPub->config.type; - topic->typeStr = nextPub->config.typeStr; - RefreshPubSubActive(topic, false); - // this may result in a duplicate publish warning on the server side, - // but send one anyway in this case just to be sure - if (nextPub->active && m_network) { - m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(), - topic->name, topic->typeStr, topic->properties, - nextPub->config); - } - } - } -} - -void LocalStorage::Impl::NetworkPropertiesUpdate(TopicData* topic, - const wpi::json& update, - bool ack) { - DEBUG4("NetworkPropertiesUpdate({},{})", topic->name, ack); - if (ack) { - return; // ignore acks - } - SetProperties(topic, update, false); -} - -LocalStorage::PublisherData* LocalStorage::Impl::AddLocalPublisher( - TopicData* topic, const wpi::json& properties, const PubSubConfig& config) { - bool didExist = topic->Exists(); - auto publisher = m_publishers.Add(m_inst, topic, config); - topic->localPublishers.Add(publisher); - - if (!didExist) { - DEBUG4("AddLocalPublisher: setting {} type {} typestr {}", topic->name, - static_cast(config.type), config.typeStr); - // set the type to the published type - topic->type = config.type; - topic->typeStr = config.typeStr; - RefreshPubSubActive(topic, true); - - if (properties.is_null()) { - topic->properties = wpi::json::object(); - } else if (properties.is_object()) { - topic->properties = properties; - } else { - WARN("ignoring non-object properties when publishing '{}'", topic->name); - topic->properties = wpi::json::object(); - } - - if (topic->properties.empty()) { - NotifyTopic(topic, NT_EVENT_PUBLISH); - } else { - PropertiesUpdated(topic, topic->properties, NT_EVENT_PUBLISH, false); - } - } else { - // only need to update just this publisher - publisher->UpdateActive(); - if (!publisher->active) { - // warn on type mismatch - INFO( - "local publish to '{}' disabled due to type mismatch (wanted '{}', " - "currently '{}')", - topic->name, config.typeStr, topic->typeStr); - } - } - - if (publisher->active && m_network) { - m_network->ClientPublish(Handle{publisher->handle}.GetIndex(), topic->name, - topic->typeStr, topic->properties, config); - } - return publisher; -} - -std::unique_ptr -LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) { - auto publisher = m_publishers.Remove(pubHandle); - if (publisher) { - auto topic = publisher->topic; - bool didExist = topic->Exists(); - topic->localPublishers.Remove(publisher.get()); - if (didExist && !topic->Exists()) { - CheckReset(topic); - NotifyTopic(topic, NT_EVENT_UNPUBLISH); - } - - if (publisher->active && m_network) { - m_network->ClientUnpublish(Handle{publisher->handle}.GetIndex()); - } - - if (publisher->active && !topic->localPublishers.empty()) { - // some other publisher still exists; if it has a different type, refresh - // and publish it over the network - auto& nextPub = topic->localPublishers.front(); - if (nextPub->config.type != topic->type || - nextPub->config.typeStr != topic->typeStr) { - topic->type = nextPub->config.type; - topic->typeStr = nextPub->config.typeStr; - RefreshPubSubActive(topic, false); - if (nextPub->active && m_network) { - m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(), - topic->name, topic->typeStr, - topic->properties, nextPub->config); - } - } - } - } - return publisher; -} - -LocalStorage::SubscriberData* LocalStorage::Impl::AddLocalSubscriber( - TopicData* topic, const PubSubConfig& config) { - DEBUG4("AddLocalSubscriber({})", topic->name); - auto subscriber = m_subscribers.Add(m_inst, topic, config); - topic->localSubscribers.Add(subscriber); - // set subscriber to active if the type matches - subscriber->UpdateActive(); - if (topic->Exists() && !subscriber->active) { - // warn on type mismatch - INFO( - "local subscribe to '{}' disabled due to type mismatch (wanted '{}', " - "published as '{}')", - topic->name, config.typeStr, topic->typeStr); - } - if (m_network && !subscriber->config.hidden) { - DEBUG4("-> NetworkSubscribe({})", topic->name); - m_network->ClientSubscribe(1 + Handle{subscriber->handle}.GetIndex(), - {{topic->name}}, config); - } - - // queue current value - if (subscriber->active) { - if (!topic->lastValueFromNetwork && !config.disableLocal) { - subscriber->pollStorage.emplace_back(topic->lastValue); - subscriber->handle.Set(); - } else if (topic->lastValueFromNetwork && !config.disableRemote) { - subscriber->pollStorage.emplace_back(topic->lastValueNetwork); - subscriber->handle.Set(); - } - } - return subscriber; -} - -std::unique_ptr -LocalStorage::Impl::RemoveLocalSubscriber(NT_Subscriber subHandle) { - auto subscriber = m_subscribers.Remove(subHandle); - if (subscriber) { - auto topic = subscriber->topic; - topic->localSubscribers.Remove(subscriber.get()); - for (auto&& listener : m_listeners) { - if (listener.getSecond()->subscriber == subscriber.get()) { - listener.getSecond()->subscriber = nullptr; - } - } - if (m_network && !subscriber->config.hidden) { - m_network->ClientUnsubscribe(1 + Handle{subscriber->handle}.GetIndex()); - } - } - return subscriber; -} - -LocalStorage::EntryData* LocalStorage::Impl::AddEntry( - SubscriberData* subscriber) { - auto entry = m_entries.Add(m_inst, subscriber); - subscriber->topic->entries.Add(entry); - return entry; -} - -std::unique_ptr LocalStorage::Impl::RemoveEntry( - NT_Entry entryHandle) { - auto entry = m_entries.Remove(entryHandle); - if (entry) { - entry->topic->entries.Remove(entry.get()); - } - return entry; -} - -LocalStorage::MultiSubscriberData* LocalStorage::Impl::AddMultiSubscriber( - std::span prefixes, const PubSubOptions& options) { - DEBUG4("AddMultiSubscriber({})", fmt::join(prefixes, ",")); - auto subscriber = m_multiSubscribers.Add(m_inst, prefixes, options); - // subscribe to any already existing topics - for (auto&& topic : m_topics) { - for (auto&& prefix : prefixes) { - if (PrefixMatch(topic->name, prefix, topic->special)) { - topic->multiSubscribers.Add(subscriber); - break; - } - } - } - if (m_network && !subscriber->options.hidden) { - DEBUG4("-> NetworkSubscribe"); - m_network->ClientSubscribe(-1 - Handle{subscriber->handle}.GetIndex(), - subscriber->prefixes, subscriber->options); - } - return subscriber; -} - -std::unique_ptr -LocalStorage::Impl::RemoveMultiSubscriber(NT_MultiSubscriber subHandle) { - auto subscriber = m_multiSubscribers.Remove(subHandle); - if (subscriber) { - for (auto&& topic : m_topics) { - topic->multiSubscribers.Remove(subscriber.get()); - } - for (auto&& listener : m_listeners) { - if (listener.getSecond()->multiSubscriber == subscriber.get()) { - listener.getSecond()->multiSubscriber = nullptr; - } - } - if (m_network && !subscriber->options.hidden) { - m_network->ClientUnsubscribe(-1 - Handle{subscriber->handle}.GetIndex()); - } - } - return subscriber; -} - -void LocalStorage::Impl::AddListenerImpl(NT_Listener listenerHandle, - TopicData* topic, - unsigned int eventMask) { - if (topic->localSubscribers.size() >= kMaxSubscribers) { - ERR("reached maximum number of subscribers to '{}', ignoring listener add", - topic->name); - return; - } - // subscribe to make sure topic updates are received - PubSubConfig config; - config.topicsOnly = (eventMask & NT_EVENT_VALUE_ALL) == 0; - auto sub = AddLocalSubscriber(topic, config); - AddListenerImpl(listenerHandle, sub, eventMask, sub->handle, true); -} - -void LocalStorage::Impl::AddListenerImpl(NT_Listener listenerHandle, - SubscriberData* subscriber, - unsigned int eventMask, - NT_Handle subentryHandle, - bool subscriberOwned) { - m_listeners.try_emplace(listenerHandle, std::make_unique( - listenerHandle, subscriber, - eventMask, subscriberOwned)); - - auto topic = subscriber->topic; - - if ((eventMask & NT_EVENT_TOPIC) != 0) { - if (topic->listeners.size() >= kMaxListeners) { - ERR("reached maximum number of listeners to '{}', not adding listener", - topic->name); - return; - } - - m_listenerStorage.Activate( - listenerHandle, eventMask & (NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE)); - - topic->listeners.Add(listenerHandle); - - // handle immediate publish - if ((eventMask & (NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE)) == - (NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE) && - topic->Exists()) { - m_listenerStorage.Notify({&listenerHandle, 1}, - NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE, - topic->GetTopicInfo()); - } - } - - if ((eventMask & NT_EVENT_VALUE_ALL) != 0) { - if (subscriber->valueListeners.size() >= kMaxListeners) { - ERR("reached maximum number of listeners to '{}', not adding listener", - topic->name); - return; - } - m_listenerStorage.Activate( - listenerHandle, eventMask & (NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE), - [subentryHandle](unsigned int mask, Event* event) { - if (auto valueData = event->GetValueEventData()) { - valueData->subentry = subentryHandle; - } - return true; - }); - - subscriber->valueListeners.Add(listenerHandle); - - // handle immediate value - if ((eventMask & NT_EVENT_VALUE_ALL) != 0 && - (eventMask & NT_EVENT_IMMEDIATE) != 0 && topic->lastValue) { - m_listenerStorage.Notify({&listenerHandle, 1}, - NT_EVENT_IMMEDIATE | NT_EVENT_VALUE_ALL, - topic->handle, subentryHandle, topic->lastValue); - } - } -} - -void LocalStorage::Impl::AddListenerImpl(NT_Listener listenerHandle, - MultiSubscriberData* subscriber, - unsigned int eventMask, - bool subscriberOwned) { - auto listener = - m_listeners - .try_emplace(listenerHandle, std::make_unique( - listenerHandle, subscriber, - eventMask, subscriberOwned)) - .first->getSecond() - .get(); - - // if we're doing anything immediate, get the list of matching topics - wpi::SmallVector topics; - if ((eventMask & NT_EVENT_IMMEDIATE) != 0 && - (eventMask & (NT_EVENT_PUBLISH | NT_EVENT_VALUE_ALL)) != 0) { - for (auto&& topic : m_topics) { - if (topic->Exists() && subscriber->Matches(topic->name, topic->special)) { - topics.emplace_back(topic.get()); - } - } - } - - if ((eventMask & NT_EVENT_TOPIC) != 0) { - if (m_topicPrefixListeners.size() >= kMaxListeners) { - ERR("reached maximum number of listeners, not adding listener"); - return; - } - - m_listenerStorage.Activate( - listenerHandle, eventMask & (NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE)); - - m_topicPrefixListeners.Add(listener); - - // handle immediate publish - if ((eventMask & (NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE)) == - (NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE)) { - std::vector topicInfos; - for (auto&& topic : topics) { - topicInfos.emplace_back(topic->GetTopicInfo()); - } - if (!topicInfos.empty()) { - m_listenerStorage.Notify({&listenerHandle, 1}, - NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE, - topicInfos); - } - } - } - - if ((eventMask & NT_EVENT_VALUE_ALL) != 0) { - if (subscriber->valueListeners.size() >= kMaxListeners) { - ERR("reached maximum number of listeners, not adding listener"); - return; - } - - m_listenerStorage.Activate( - listenerHandle, eventMask & (NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE), - [subentryHandle = subscriber->handle.GetHandle()](unsigned int mask, - Event* event) { - if (auto valueData = event->GetValueEventData()) { - valueData->subentry = subentryHandle; - } - return true; - }); - - subscriber->valueListeners.Add(listenerHandle); - - // handle immediate value - if ((eventMask & NT_EVENT_VALUE_ALL) != 0 && - (eventMask & NT_EVENT_IMMEDIATE) != 0) { - for (auto&& topic : topics) { - if (topic->lastValue) { - m_listenerStorage.Notify( - {&listenerHandle, 1}, NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE, - topic->handle, subscriber->handle, topic->lastValue); - } - } - } - } -} - -LocalStorage::TopicData* LocalStorage::Impl::GetOrCreateTopic( - std::string_view name) { - auto& topic = m_nameTopics[name]; - // create if it does not already exist - if (!topic) { - topic = m_topics.Add(m_inst, name); - // attach multi-subscribers - for (auto&& sub : m_multiSubscribers) { - if (sub->Matches(name, topic->special)) { - topic->multiSubscribers.Add(sub.get()); - } - } - } - return topic; -} - -LocalStorage::TopicData* LocalStorage::Impl::GetTopic(NT_Handle handle) { - switch (Handle{handle}.GetType()) { - case Handle::kEntry: { - if (auto entry = m_entries.Get(handle)) { - return entry->topic; - } - break; - } - case Handle::kSubscriber: { - if (auto subscriber = m_subscribers.Get(handle)) { - return subscriber->topic; - } - break; - } - case Handle::kPublisher: { - if (auto publisher = m_publishers.Get(handle)) { - return publisher->topic; - } - break; - } - case Handle::kTopic: - return m_topics.Get(handle); - default: - break; - } - return {}; -} - -LocalStorage::SubscriberData* LocalStorage::Impl::GetSubEntry( - NT_Handle subentryHandle) { - Handle h{subentryHandle}; - if (h.IsType(Handle::kSubscriber)) { - return m_subscribers.Get(subentryHandle); - } else if (h.IsType(Handle::kEntry)) { - auto entry = m_entries.Get(subentryHandle); - return entry ? entry->subscriber : nullptr; - } else { - return nullptr; - } -} - -LocalStorage::PublisherData* LocalStorage::Impl::PublishEntry(EntryData* entry, - NT_Type type) { - if (entry->publisher) { - return entry->publisher; - } - if (entry->subscriber->config.type == NT_UNASSIGNED) { - auto typeStr = TypeToString(type); - entry->subscriber->config.type = type; - entry->subscriber->config.typeStr = typeStr; - } else if (entry->subscriber->config.type != type) { - if (!IsNumericCompatible(type, entry->subscriber->config.type)) { - // don't allow dynamically changing the type of an entry - auto typeStr = TypeToString(type); - ERR("cannot publish entry {} as type {}, previously subscribed as {}", - entry->topic->name, typeStr, entry->subscriber->config.typeStr); - return nullptr; - } - } - // create publisher - entry->publisher = AddLocalPublisher(entry->topic, wpi::json::object(), - entry->subscriber->config); - // exclude publisher if requested - if (entry->subscriber->config.excludeSelf) { - entry->subscriber->config.excludePublisher = entry->publisher->handle; - } - return entry->publisher; -} - -bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher, - const Value& value, bool force) { - if (!value) { - return false; - } - if (publisher->topic->type != NT_UNASSIGNED && - publisher->topic->type != value.type()) { - if (IsNumericCompatible(publisher->topic->type, value.type())) { - return PublishLocalValue( - publisher, ConvertNumericValue(value, publisher->topic->type)); - } - return false; - } - if (publisher->active) { - bool isNetworkDuplicate, suppressDuplicates; - if (force || publisher->config.keepDuplicates) { - suppressDuplicates = false; - isNetworkDuplicate = false; - } else { - suppressDuplicates = true; - isNetworkDuplicate = publisher->topic->IsCached() && - (publisher->topic->lastValueNetwork == value); - } - if (!isNetworkDuplicate && m_network) { - if (publisher->topic->IsCached()) { - publisher->topic->lastValueNetwork = value; - } - m_network->ClientSetValue(Handle{publisher->handle}.GetIndex(), value); - } - return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, - suppressDuplicates, publisher); - } else { - return false; - } -} - -bool LocalStorage::Impl::SetEntryValue(NT_Handle pubentryHandle, - const Value& value) { - if (!value) { - return false; - } - auto publisher = m_publishers.Get(pubentryHandle); - if (!publisher) { - if (auto entry = m_entries.Get(pubentryHandle)) { - publisher = PublishEntry(entry, value.type()); - } - if (!publisher) { - return false; - } - } - return PublishLocalValue(publisher, value); -} - -bool LocalStorage::Impl::SetDefaultEntryValue(NT_Handle pubsubentryHandle, - const Value& value) { - DEBUG4("SetDefaultEntryValue({}, {})", pubsubentryHandle, - static_cast(value.type())); - if (!value) { - return false; - } - if (auto topic = GetTopic(pubsubentryHandle)) { - if (!topic->IsCached()) { - WARN("ignoring default value on non-cached topic '{}'", topic->name); - return false; - } - if (!topic->lastValue && - (topic->type == NT_UNASSIGNED || topic->type == value.type() || - IsNumericCompatible(topic->type, value.type()))) { - // publish if we haven't yet - auto publisher = m_publishers.Get(pubsubentryHandle); - if (!publisher) { - if (auto entry = m_entries.Get(pubsubentryHandle)) { - publisher = PublishEntry(entry, value.type()); - } - } - - // force value timestamps to 0 - if (topic->type == NT_UNASSIGNED) { - topic->type = value.type(); - } - Value newValue; - if (topic->type == value.type()) { - newValue = value; - } else if (IsNumericCompatible(topic->type, value.type())) { - newValue = ConvertNumericValue(value, topic->type); - } else { - return true; - } - newValue.SetTime(0); - newValue.SetServerTime(0); - if (publisher) { - PublishLocalValue(publisher, newValue, true); - } else { - topic->lastValue = newValue; - } - return true; - } - } - return false; -} - -void LocalStorage::Impl::RemoveSubEntry(NT_Handle subentryHandle) { - Handle h{subentryHandle}; - if (h.IsType(Handle::kSubscriber)) { - RemoveLocalSubscriber(subentryHandle); - } else if (h.IsType(Handle::kMultiSubscriber)) { - RemoveMultiSubscriber(subentryHandle); - } else if (h.IsType(Handle::kEntry)) { - if (auto entry = RemoveEntry(subentryHandle)) { - RemoveLocalSubscriber(entry->subscriber->handle); - if (entry->publisher) { - RemoveLocalPublisher(entry->publisher->handle); - } - } - } -} - -LocalStorage::Impl::Impl(int inst, IListenerStorage& listenerStorage, - wpi::Logger& logger) - : m_inst{inst}, m_listenerStorage{listenerStorage}, m_logger{logger} {} - -LocalStorage::~LocalStorage() = default; - -int LocalStorage::ServerAnnounce(std::string_view name, int id, - std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid) { - std::scoped_lock lock{m_mutex}; - auto topic = m_impl.GetOrCreateTopic(name); - m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid); - return Handle{topic->handle}.GetIndex(); -} - -void LocalStorage::ServerUnannounce(std::string_view name, int id) { - std::scoped_lock lock{m_mutex}; - auto topic = m_impl.GetOrCreateTopic(name); - m_impl.RemoveNetworkPublisher(topic); -} - -void LocalStorage::ServerPropertiesUpdate(std::string_view name, - const wpi::json& update, bool ack) { - std::scoped_lock lock{m_mutex}; - auto it = m_impl.m_nameTopics.find(name); - if (it != m_impl.m_nameTopics.end()) { - m_impl.NetworkPropertiesUpdate(it->second, update, ack); - } -} - -void LocalStorage::ServerSetValue(int topicId, const Value& value) { - std::scoped_lock lock{m_mutex}; - if (auto topic = - m_impl.m_topics.Get(Handle{m_impl.m_inst, topicId, Handle::kTopic})) { - if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) { - if (topic->IsCached()) { - topic->lastValueNetwork = value; - topic->lastValueFromNetwork = true; - } - } - } -} - -void LocalStorage::StartNetwork(net::ClientMessageHandler* network) { - std::scoped_lock lock{m_mutex}; - m_impl.StartNetwork(network); -} - -void LocalStorage::Impl::StartNetwork(net::ClientMessageHandler* network) { - DEBUG4("StartNetwork()"); - m_network = network; - // publish all active publishers to the network and send last values - // only send value once per topic - for (auto&& topic : m_topics) { - PublisherData* anyPublisher = nullptr; - for (auto&& publisher : topic->localPublishers) { - if (publisher->active) { - network->ClientPublish(Handle{publisher->handle}.GetIndex(), - topic->name, topic->typeStr, topic->properties, - publisher->config); - anyPublisher = publisher; - } - } - if (anyPublisher && topic->lastValue) { - network->ClientSetValue(Handle{anyPublisher->handle}.GetIndex(), - topic->lastValue); - } - } - for (auto&& subscriber : m_subscribers) { - if (!subscriber->config.hidden) { - network->ClientSubscribe(1 + Handle{subscriber->handle}.GetIndex(), - {{subscriber->topic->name}}, subscriber->config); - } - } - for (auto&& subscriber : m_multiSubscribers) { - if (!subscriber->options.hidden) { - network->ClientSubscribe(-1 - Handle{subscriber->handle}.GetIndex(), - subscriber->prefixes, subscriber->options); - } - } -} - -void LocalStorage::ClearNetwork() { - WPI_DEBUG4(m_impl.m_logger, "ClearNetwork()"); - std::scoped_lock lock{m_mutex}; - m_impl.m_network = nullptr; - // treat as an unannounce all from the network side - for (auto&& topic : m_impl.m_topics) { - m_impl.RemoveNetworkPublisher(topic.get()); - } -} - -template -static void ForEachTopic(T& topics, std::string_view prefix, unsigned int types, - F func) { - for (auto&& topic : topics) { - if (!topic->Exists()) { - continue; - } - if (!wpi::starts_with(topic->name, prefix)) { - continue; - } - if (types != 0 && (types & topic->type) == 0) { - continue; - } - func(*topic); - } -} - -template -static void ForEachTopic(T& topics, std::string_view prefix, - std::span types, F func) { - for (auto&& topic : topics) { - if (!topic->Exists()) { - continue; - } - if (!wpi::starts_with(topic->name, prefix)) { - continue; - } - if (!types.empty()) { - bool match = false; - for (auto&& type : types) { - if (topic->typeStr == type) { - match = true; - break; - } - } - if (!match) { - continue; - } - } - func(*topic); - } -} - std::vector LocalStorage::GetTopics(std::string_view prefix, unsigned int types) { std::scoped_lock lock(m_mutex); std::vector rv; - ForEachTopic(m_impl.m_topics, prefix, types, - [&](TopicData& topic) { rv.push_back(topic.handle); }); + m_impl.ForEachTopic(prefix, types, + [&](auto& topic) { rv.push_back(topic.handle); }); return rv; } @@ -1219,8 +21,8 @@ std::vector LocalStorage::GetTopics( std::string_view prefix, std::span types) { std::scoped_lock lock(m_mutex); std::vector rv; - ForEachTopic(m_impl.m_topics, prefix, types, - [&](TopicData& topic) { rv.push_back(topic.handle); }); + m_impl.ForEachTopic(prefix, types, + [&](auto& topic) { rv.push_back(topic.handle); }); return rv; } @@ -1228,7 +30,7 @@ std::vector LocalStorage::GetTopicInfo(std::string_view prefix, unsigned int types) { std::scoped_lock lock(m_mutex); std::vector rv; - ForEachTopic(m_impl.m_topics, prefix, types, [&](TopicData& topic) { + m_impl.ForEachTopic(prefix, types, [&](auto& topic) { rv.emplace_back(topic.GetTopicInfo()); }); return rv; @@ -1238,166 +40,12 @@ std::vector LocalStorage::GetTopicInfo( std::string_view prefix, std::span types) { std::scoped_lock lock(m_mutex); std::vector rv; - ForEachTopic(m_impl.m_topics, prefix, types, [&](TopicData& topic) { + m_impl.ForEachTopic(prefix, types, [&](auto& topic) { rv.emplace_back(topic.GetTopicInfo()); }); return rv; } -void LocalStorage::SetTopicProperty(NT_Topic topicHandle, std::string_view name, - const wpi::json& value) { - std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { - if (value.is_null()) { - topic->properties.erase(name); - } else { - topic->properties[name] = value; - } - wpi::json update = wpi::json::object(); - update[name] = value; - m_impl.PropertiesUpdated(topic, update, NT_EVENT_NONE, true); - } -} - -void LocalStorage::DeleteTopicProperty(NT_Topic topicHandle, - std::string_view name) { - std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { - topic->properties.erase(name); - wpi::json update = wpi::json::object(); - update[name] = wpi::json(); - m_impl.PropertiesUpdated(topic, update, NT_EVENT_NONE, true); - } -} - -bool LocalStorage::SetTopicProperties(NT_Topic topicHandle, - const wpi::json& update) { - if (!update.is_object()) { - return false; - } - std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { - m_impl.SetProperties(topic, update, true); - return true; - } else { - return {}; - } -} - -NT_Subscriber LocalStorage::Subscribe(NT_Topic topicHandle, NT_Type type, - std::string_view typeStr, - const PubSubOptions& options) { - std::scoped_lock lock{m_mutex}; - - // Get the topic - auto* topic = m_impl.m_topics.Get(topicHandle); - if (!topic) { - return 0; - } - - if (topic->localSubscribers.size() >= kMaxSubscribers) { - WPI_ERROR(m_impl.m_logger, - "reached maximum number of subscribers to '{}', not subscribing", - topic->name); - return 0; - } - - // Create subscriber - return m_impl.AddLocalSubscriber(topic, PubSubConfig{type, typeStr, options}) - ->handle; -} - -NT_MultiSubscriber LocalStorage::SubscribeMultiple( - std::span prefixes, const PubSubOptions& options) { - std::scoped_lock lock{m_mutex}; - - if (m_impl.m_multiSubscribers.size() >= kMaxMultiSubscribers) { - WPI_ERROR(m_impl.m_logger, - "reached maximum number of multi-subscribers, not subscribing"); - return 0; - } - - return m_impl.AddMultiSubscriber(prefixes, options)->handle; -} - -NT_Publisher LocalStorage::Publish(NT_Topic topicHandle, NT_Type type, - std::string_view typeStr, - const wpi::json& properties, - const PubSubOptions& options) { - std::scoped_lock lock{m_mutex}; - - // Get the topic - auto* topic = m_impl.m_topics.Get(topicHandle); - if (!topic) { - WPI_ERROR(m_impl.m_logger, "trying to publish invalid topic handle ({})", - topicHandle); - return 0; - } - - if (type == NT_UNASSIGNED || typeStr.empty()) { - WPI_ERROR( - m_impl.m_logger, - "cannot publish '{}' with an unassigned type or empty type string", - topic->name); - return 0; - } - - if (topic->localPublishers.size() >= kMaxPublishers) { - WPI_ERROR(m_impl.m_logger, - "reached maximum number of publishers to '{}', not publishing", - topic->name); - return 0; - } - - return m_impl - .AddLocalPublisher(topic, properties, - PubSubConfig{type, typeStr, options}) - ->handle; -} - -void LocalStorage::Unpublish(NT_Handle pubentryHandle) { - std::scoped_lock lock{m_mutex}; - - if (Handle{pubentryHandle}.IsType(Handle::kPublisher)) { - m_impl.RemoveLocalPublisher(pubentryHandle); - } else if (auto entry = m_impl.m_entries.Get(pubentryHandle)) { - if (entry->publisher) { - m_impl.RemoveLocalPublisher(entry->publisher->handle); - entry->publisher = nullptr; - } - } else { - // TODO: report warning - return; - } -} - -NT_Entry LocalStorage::GetEntry(NT_Topic topicHandle, NT_Type type, - std::string_view typeStr, - const PubSubOptions& options) { - std::scoped_lock lock{m_mutex}; - - // Get the topic - auto* topic = m_impl.m_topics.Get(topicHandle); - if (!topic) { - return 0; - } - - if (topic->localSubscribers.size() >= kMaxSubscribers) { - WPI_ERROR( - m_impl.m_logger, - "reached maximum number of subscribers to '{}', not creating entry", - topic->name); - return 0; - } - - // Create subscriber - auto subscriber = - m_impl.AddLocalSubscriber(topic, PubSubConfig{type, typeStr, options}); - - // Create entry - return m_impl.AddEntry(subscriber)->handle; -} - void LocalStorage::Release(NT_Handle pubsubentryHandle) { switch (Handle{pubsubentryHandle}.GetType()) { case Handle::kEntry: @@ -1433,186 +81,18 @@ Value LocalStorage::GetEntryValue(NT_Handle subentryHandle) { return {}; } -NT_Entry LocalStorage::GetEntry(std::string_view name) { - if (name.empty()) { - return {}; - } - - std::scoped_lock lock{m_mutex}; - - // Get the topic data - auto* topic = m_impl.GetOrCreateTopic(name); - - if (topic->entry == 0) { - if (topic->localSubscribers.size() >= kMaxSubscribers) { - WPI_ERROR( - m_impl.m_logger, - "reached maximum number of subscribers to '{}', not creating entry", - topic->name); - return 0; - } - - // Create subscriber - auto* subscriber = m_impl.AddLocalSubscriber(topic, {}); - - // Create entry - topic->entry = m_impl.AddEntry(subscriber)->handle; - } - - return topic->entry; -} - -void LocalStorage::AddListener(NT_Listener listenerHandle, - std::span prefixes, - unsigned int mask) { - mask &= (NT_EVENT_TOPIC | NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE); - std::scoped_lock lock{m_mutex}; - if (m_impl.m_multiSubscribers.size() >= kMaxMultiSubscribers) { - WPI_ERROR( - m_impl.m_logger, - "reached maximum number of multi-subscribers, not adding listener"); - return; - } - // subscribe to make sure topic updates are received - auto sub = m_impl.AddMultiSubscriber( - prefixes, {.topicsOnly = (mask & NT_EVENT_VALUE_ALL) == 0}); - m_impl.AddListenerImpl(listenerHandle, sub, mask, true); -} - void LocalStorage::AddListener(NT_Listener listenerHandle, NT_Handle handle, unsigned int mask) { mask &= (NT_EVENT_TOPIC | NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE); std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(handle)) { + if (auto topic = m_impl.GetTopicByHandle(handle)) { m_impl.AddListenerImpl(listenerHandle, topic, mask); - } else if (auto sub = m_impl.m_multiSubscribers.Get(handle)) { + } else if (auto sub = m_impl.GetMultiSubscriberByHandle(handle)) { m_impl.AddListenerImpl(listenerHandle, sub, mask, false); - } else if (auto sub = m_impl.m_subscribers.Get(handle)) { + } else if (auto sub = m_impl.GetSubscriberByHandle(handle)) { m_impl.AddListenerImpl(listenerHandle, sub, mask, sub->handle, false); - } else if (auto entry = m_impl.m_entries.Get(handle)) { + } else if (auto entry = m_impl.GetEntryByHandle(handle)) { m_impl.AddListenerImpl(listenerHandle, entry->subscriber, mask, entry->handle, false); } } - -void LocalStorage::RemoveListener(NT_Listener listenerHandle, - unsigned int mask) { - std::scoped_lock lock{m_mutex}; - auto listenerIt = m_impl.m_listeners.find(listenerHandle); - if (listenerIt == m_impl.m_listeners.end()) { - return; - } - auto listener = std::move(listenerIt->getSecond()); - m_impl.m_listeners.erase(listenerIt); - if (!listener) { - return; - } - - m_impl.m_topicPrefixListeners.Remove(listener.get()); - if (listener->subscriber) { - listener->subscriber->valueListeners.Remove(listenerHandle); - listener->subscriber->topic->listeners.Remove(listenerHandle); - if (listener->subscriberOwned) { - m_impl.RemoveLocalSubscriber(listener->subscriber->handle); - } - } - if (listener->multiSubscriber) { - listener->multiSubscriber->valueListeners.Remove(listenerHandle); - if (listener->subscriberOwned) { - m_impl.RemoveMultiSubscriber(listener->multiSubscriber->handle); - } - } -} - -NT_DataLogger LocalStorage::StartDataLog(wpi::log::DataLog& log, - std::string_view prefix, - std::string_view logPrefix) { - std::scoped_lock lock{m_mutex}; - auto datalogger = - m_impl.m_dataloggers.Add(m_impl.m_inst, log, prefix, logPrefix); - - // start logging any matching topics - auto now = nt::Now(); - for (auto&& topic : m_impl.m_topics) { - if (!PrefixMatch(topic->name, prefix, topic->special) || - topic->type == NT_UNASSIGNED || topic->typeStr.empty()) { - continue; - } - topic->datalogs.emplace_back(log, datalogger->Start(topic.get(), now), - datalogger->handle); - topic->datalogType = topic->type; - - // log current value, if any - if (topic->lastValue) { - topic->datalogs.back().Append(topic->lastValue); - } - } - - return datalogger->handle; -} - -void LocalStorage::StopDataLog(NT_DataLogger logger) { - std::scoped_lock lock{m_mutex}; - if (auto datalogger = m_impl.m_dataloggers.Remove(logger)) { - // finish any active entries - auto now = Now(); - for (auto&& topic : m_impl.m_topics) { - auto it = - std::find_if(topic->datalogs.begin(), topic->datalogs.end(), - [&](const auto& elem) { return elem.logger == logger; }); - if (it != topic->datalogs.end()) { - it->log->Finish(it->entry, now); - topic->datalogs.erase(it); - } - } - } -} - -bool LocalStorage::HasSchema(std::string_view name) { - std::scoped_lock lock{m_mutex}; - wpi::SmallString<128> fullName{"/.schema/"}; - fullName += name; - auto it = m_impl.m_schemas.find(fullName); - return it != m_impl.m_schemas.end(); -} - -void LocalStorage::AddSchema(std::string_view name, std::string_view type, - std::span schema) { - std::scoped_lock lock{m_mutex}; - wpi::SmallString<128> fullName{"/.schema/"}; - fullName += name; - auto& pubHandle = m_impl.m_schemas[fullName]; - if (pubHandle != 0) { - return; - } - - auto topic = m_impl.GetOrCreateTopic(fullName); - - if (topic->localPublishers.size() >= kMaxPublishers) { - WPI_ERROR(m_impl.m_logger, - "reached maximum number of publishers to '{}', not publishing", - topic->name); - return; - } - - pubHandle = m_impl - .AddLocalPublisher(topic, {{"retained", true}}, - PubSubConfig{NT_RAW, type, {}}) - ->handle; - - m_impl.SetDefaultEntryValue(pubHandle, Value::MakeRaw(schema)); -} - -void LocalStorage::Reset() { - std::scoped_lock lock{m_mutex}; - m_impl.m_network = nullptr; - m_impl.m_topics.clear(); - m_impl.m_publishers.clear(); - m_impl.m_subscribers.clear(); - m_impl.m_entries.clear(); - m_impl.m_multiSubscribers.clear(); - m_impl.m_dataloggers.clear(); - m_impl.m_nameTopics.clear(); - m_impl.m_listeners.clear(); - m_impl.m_topicPrefixListeners.clear(); -} diff --git a/ntcore/src/main/native/cpp/LocalStorage.h b/ntcore/src/main/native/cpp/LocalStorage.h index 007e2b51a6..09a56d3e44 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.h +++ b/ntcore/src/main/native/cpp/LocalStorage.h @@ -6,25 +6,18 @@ #include -#include +#include #include #include #include -#include #include -#include -#include -#include +#include +#include #include #include -#include "Handle.h" -#include "HandleMap.h" -#include "PubSubOptions.h" -#include "Types_internal.h" -#include "ValueCircularBuffer.h" -#include "VectorSet.h" +#include "local/LocalStorageImpl.h" #include "net/MessageHandler.h" #include "net/NetworkInterface.h" #include "ntcore_cpp.h" @@ -43,19 +36,46 @@ class LocalStorage final : public net::ILocalStorage { : m_impl{inst, listenerStorage, logger} {} LocalStorage(const LocalStorage&) = delete; LocalStorage& operator=(const LocalStorage&) = delete; - ~LocalStorage() final; // network interface functions int ServerAnnounce(std::string_view name, int id, std::string_view typeStr, const wpi::json& properties, - std::optional pubuid) final; - void ServerUnannounce(std::string_view name, int id) final; - void ServerPropertiesUpdate(std::string_view name, const wpi::json& update, - bool ack) final; - void ServerSetValue(int topicId, const Value& value) final; + std::optional pubuid) final { + std::scoped_lock lock{m_mutex}; + auto topic = m_impl.GetOrCreateTopic(name); + m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid); + return Handle{topic->handle}.GetIndex(); + } - void StartNetwork(net::ClientMessageHandler* network) final; - void ClearNetwork() final; + void ServerUnannounce(std::string_view name, int id) final { + std::scoped_lock lock{m_mutex}; + m_impl.RemoveNetworkPublisher(m_impl.GetOrCreateTopic(name)); + } + + void ServerPropertiesUpdate(std::string_view name, const wpi::json& update, + bool ack) final { + std::scoped_lock lock{m_mutex}; + if (auto topic = m_impl.GetTopicByName(name)) { + m_impl.NetworkPropertiesUpdate(topic, update, ack); + } + } + + void ServerSetValue(int topicId, const Value& value) final { + std::scoped_lock lock{m_mutex}; + if (auto topic = m_impl.GetTopicById(topicId)) { + m_impl.ServerSetValue(topic, value); + } + } + + void StartNetwork(net::ClientMessageHandler* network) final { + std::scoped_lock lock{m_mutex}; + m_impl.StartNetwork(network); + } + + void ClearNetwork() final { + std::scoped_lock lock{m_mutex}; + m_impl.ClearNetwork(); + } // User functions. These are the actual implementations of the corresponding // user API functions in ntcore_cpp. @@ -79,7 +99,7 @@ class LocalStorage final : public net::ILocalStorage { std::string GetTopicName(NT_Topic topicHandle) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { return topic->name; } else { return {}; @@ -88,7 +108,7 @@ class LocalStorage final : public net::ILocalStorage { NT_Type GetTopicType(NT_Topic topicHandle) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { return topic->type; } else { return {}; @@ -97,7 +117,7 @@ class LocalStorage final : public net::ILocalStorage { std::string GetTopicTypeString(NT_Topic topicHandle) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { return topic->typeStr; } else { return {}; @@ -106,15 +126,15 @@ class LocalStorage final : public net::ILocalStorage { void SetTopicPersistent(NT_Topic topicHandle, bool value) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { m_impl.SetPersistent(topic, value); } } bool GetTopicPersistent(NT_Topic topicHandle) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { - return (topic->flags & NT_PERSISTENT) != 0; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + return (topic->GetFlags() & NT_PERSISTENT) != 0; } else { return false; } @@ -122,15 +142,15 @@ class LocalStorage final : public net::ILocalStorage { void SetTopicRetained(NT_Topic topicHandle, bool value) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { m_impl.SetRetained(topic, value); } } bool GetTopicRetained(NT_Topic topicHandle) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { - return (topic->flags & NT_RETAINED) != 0; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + return (topic->GetFlags() & NT_RETAINED) != 0; } else { return false; } @@ -138,15 +158,15 @@ class LocalStorage final : public net::ILocalStorage { void SetTopicCached(NT_Topic topicHandle, bool value) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { m_impl.SetCached(topic, value); } } bool GetTopicCached(NT_Topic topicHandle) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { - return (topic->flags & NT_UNCACHED) == 0; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + return (topic->GetFlags() & NT_UNCACHED) == 0; } else { return false; } @@ -154,47 +174,72 @@ class LocalStorage final : public net::ILocalStorage { bool GetTopicExists(NT_Handle handle) { std::scoped_lock lock{m_mutex}; - TopicData* topic = m_impl.GetTopic(handle); + local::LocalTopic* topic = m_impl.GetTopic(handle); return topic && topic->Exists(); } wpi::json GetTopicProperty(NT_Topic topicHandle, std::string_view name) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { return topic->properties.value(name, wpi::json{}); } else { return {}; } } - void SetTopicProperty(NT_Topic topic, std::string_view name, - const wpi::json& value); + void SetTopicProperty(NT_Topic topicHandle, std::string_view name, + const wpi::json& value) { + std::scoped_lock lock{m_mutex}; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + m_impl.SetProperty(topic, name, value); + } + } - void DeleteTopicProperty(NT_Topic topic, std::string_view name); + void DeleteTopicProperty(NT_Topic topicHandle, std::string_view name) { + std::scoped_lock lock{m_mutex}; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + m_impl.DeleteProperty(topic, name); + } + } wpi::json GetTopicProperties(NT_Topic topicHandle) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { return topic->properties; } else { return wpi::json::object(); } } - bool SetTopicProperties(NT_Topic topic, const wpi::json& update); + bool SetTopicProperties(NT_Topic topicHandle, const wpi::json& update) { + std::scoped_lock lock{m_mutex}; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + return m_impl.SetProperties(topic, update, true); + } else { + return {}; + } + } TopicInfo GetTopicInfo(NT_Topic topicHandle) { std::scoped_lock lock{m_mutex}; - if (auto topic = m_impl.m_topics.Get(topicHandle)) { + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { return topic->GetTopicInfo(); } else { return {}; } } - NT_Subscriber Subscribe(NT_Topic topic, NT_Type type, + NT_Subscriber Subscribe(NT_Topic topicHandle, NT_Type type, std::string_view typeStr, - const PubSubOptions& options); + const PubSubOptions& options) { + std::scoped_lock lock{m_mutex}; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + if (auto subscriber = m_impl.Subscribe(topic, type, typeStr, options)) { + return subscriber->handle; + } + } + return {}; + } void Unsubscribe(NT_Subscriber subHandle) { std::scoped_lock lock{m_mutex}; @@ -202,21 +247,52 @@ class LocalStorage final : public net::ILocalStorage { } NT_MultiSubscriber SubscribeMultiple( - std::span prefixes, const PubSubOptions& options); + std::span prefixes, + const PubSubOptions& options) { + std::scoped_lock lock{m_mutex}; + if (auto sub = m_impl.AddMultiSubscriber(prefixes, options)) { + return sub->handle; + } else { + return {}; + } + } void UnsubscribeMultiple(NT_MultiSubscriber subHandle) { std::scoped_lock lock{m_mutex}; m_impl.RemoveMultiSubscriber(subHandle); } - NT_Publisher Publish(NT_Topic topic, NT_Type type, std::string_view typeStr, - const wpi::json& properties, - const PubSubOptions& options); + NT_Publisher Publish(NT_Topic topicHandle, NT_Type type, + std::string_view typeStr, const wpi::json& properties, + const PubSubOptions& options) { + std::scoped_lock lock{m_mutex}; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + if (auto publisher = + m_impl.Publish(topic, type, typeStr, properties, options)) { + return publisher->handle; + } + } else { + WPI_ERROR(m_impl.GetLogger(), + "trying to publish invalid topic handle ({})", topicHandle); + } + return {}; + } - void Unpublish(NT_Handle pubentry); + void Unpublish(NT_Handle pubentryHandle) { + std::scoped_lock lock{m_mutex}; + m_impl.Unpublish(pubentryHandle); + } - NT_Entry GetEntry(NT_Topic topic, NT_Type type, std::string_view typeStr, - const PubSubOptions& options); + NT_Entry GetEntry(NT_Topic topicHandle, NT_Type type, + std::string_view typeStr, const PubSubOptions& options) { + std::scoped_lock lock{m_mutex}; + if (auto topic = m_impl.GetTopicByHandle(topicHandle)) { + if (auto entry = m_impl.GetEntry(topic, type, typeStr, options)) { + return entry->handle; + } + } + return {}; + } void ReleaseEntry(NT_Entry entryHandle) { std::scoped_lock lock{m_mutex}; @@ -298,22 +374,29 @@ class LocalStorage final : public net::ILocalStorage { void SetEntryFlags(NT_Entry entryHandle, unsigned int flags) { std::scoped_lock lock{m_mutex}; - if (auto entry = m_impl.m_entries.Get(entryHandle)) { + if (auto entry = m_impl.GetEntryByHandle(entryHandle)) { m_impl.SetFlags(entry->subscriber->topic, flags); } } unsigned int GetEntryFlags(NT_Entry entryHandle) { std::scoped_lock lock{m_mutex}; - if (auto entry = m_impl.m_entries.Get(entryHandle)) { - return entry->subscriber->topic->flags; + if (auto entry = m_impl.GetEntryByHandle(entryHandle)) { + return entry->subscriber->topic->GetFlags(); } else { return 0; } } // Index-only - NT_Entry GetEntry(std::string_view name); + NT_Entry GetEntry(std::string_view name) { + std::scoped_lock lock{m_mutex}; + if (auto entry = m_impl.GetEntry(name)) { + return entry->handle; + } else { + return {}; + } + } std::string GetEntryName(NT_Entry subentryHandle) { std::scoped_lock lock{m_mutex}; @@ -346,346 +429,65 @@ class LocalStorage final : public net::ILocalStorage { // Listener functions // - void AddListener(NT_Listener listener, + void AddListener(NT_Listener listenerHandle, std::span prefixes, - unsigned int mask); + unsigned int mask) { + mask &= (NT_EVENT_TOPIC | NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE); + std::scoped_lock lock{m_mutex}; + // subscribe to make sure topic updates are received + if (auto sub = m_impl.AddMultiSubscriber( + prefixes, {.topicsOnly = (mask & NT_EVENT_VALUE_ALL) == 0})) { + m_impl.AddListenerImpl(listenerHandle, sub, mask, true); + } + } + void AddListener(NT_Listener listener, NT_Handle handle, unsigned int mask); - void RemoveListener(NT_Listener listener, unsigned int mask); + void RemoveListener(NT_Listener listener, unsigned int mask) { + std::scoped_lock lock{m_mutex}; + m_impl.RemoveListener(listener, mask); + } // // Data log functions // NT_DataLogger StartDataLog(wpi::log::DataLog& log, std::string_view prefix, - std::string_view logPrefix); - void StopDataLog(NT_DataLogger logger); + std::string_view logPrefix) { + std::scoped_lock lock{m_mutex}; + if (auto dl = m_impl.StartDataLog(log, prefix, logPrefix)) { + return dl->handle; + } else { + return {}; + } + } + + void StopDataLog(NT_DataLogger logger) { + std::scoped_lock lock{m_mutex}; + m_impl.StopDataLog(logger); + } // // Schema functions // - bool HasSchema(std::string_view name); - void AddSchema(std::string_view name, std::string_view type, - std::span schema); - - void Reset(); - - private: - static constexpr bool IsSpecial(std::string_view name) { - return name.empty() ? false : name.front() == '$'; + bool HasSchema(std::string_view name) { + std::scoped_lock lock{m_mutex}; + return m_impl.HasSchema(name); } - struct EntryData; - struct PublisherData; - struct SubscriberData; - struct MultiSubscriberData; - - struct DataLoggerEntry { - DataLoggerEntry(wpi::log::DataLog& log, int entry, NT_DataLogger logger) - : log{&log}, entry{entry}, logger{logger} {} - - static std::string MakeMetadata(std::string_view properties); - - void Append(const Value& v); - - wpi::log::DataLog* log; - int entry; - NT_DataLogger logger; - }; - - struct TopicData { - static constexpr auto kType = Handle::kTopic; - - TopicData(NT_Topic handle, std::string_view name) - : handle{handle}, name{name}, special{IsSpecial(name)} {} - - bool Exists() const { return onNetwork || !localPublishers.empty(); } - - bool IsCached() const { return (flags & NT_UNCACHED) == 0; } - - TopicInfo GetTopicInfo() const; - - // invariants - wpi::SignalObject handle; - std::string name; - bool special; - - Value lastValue; // also stores timestamp - Value lastValueNetwork; - NT_Type type{NT_UNASSIGNED}; - std::string typeStr; - unsigned int flags{0}; // for NT3 APIs - std::string propertiesStr{"{}"}; // cached string for GetTopicInfo() et al - wpi::json properties = wpi::json::object(); - NT_Entry entry{0}; // cached entry for GetEntry() - - bool onNetwork{false}; // true if there are any remote publishers - bool lastValueFromNetwork{false}; - - wpi::SmallVector datalogs; - NT_Type datalogType{NT_UNASSIGNED}; - - VectorSet localPublishers; - VectorSet localSubscribers; - VectorSet multiSubscribers; - VectorSet entries; - VectorSet listeners; - }; - - struct PubSubConfig : public PubSubOptionsImpl { - PubSubConfig() = default; - PubSubConfig(NT_Type type, std::string_view typeStr, - const PubSubOptions& options) - : PubSubOptionsImpl{options}, type{type}, typeStr{typeStr} { - prefixMatch = false; - } - - NT_Type type{NT_UNASSIGNED}; - std::string typeStr; - }; - - struct PublisherData { - static constexpr auto kType = Handle::kPublisher; - - PublisherData(NT_Publisher handle, TopicData* topic, PubSubConfig config) - : handle{handle}, topic{topic}, config{std::move(config)} {} - - void UpdateActive() { - active = config.type == topic->type && config.typeStr == topic->typeStr; - } - - // invariants - wpi::SignalObject handle; - TopicData* topic; - PubSubConfig config; - - // whether or not the publisher should actually publish values - bool active{false}; - }; - - struct SubscriberData { - static constexpr auto kType = Handle::kSubscriber; - - SubscriberData(NT_Subscriber handle, TopicData* topic, PubSubConfig config) - : handle{handle}, - topic{topic}, - config{std::move(config)}, - pollStorage{config.pollStorage} {} - - void UpdateActive() { - // for subscribers, unassigned is a wildcard - // also allow numerically compatible subscribers - active = - config.type == NT_UNASSIGNED || - (config.type == topic->type && config.typeStr == topic->typeStr) || - IsNumericCompatible(config.type, topic->type); - } - - // invariants - wpi::SignalObject handle; - TopicData* topic; - PubSubConfig config; - - // whether or not the subscriber should actually receive values - bool active{false}; - - // polling storage - ValueCircularBuffer pollStorage; - - // value listeners - VectorSet valueListeners; - }; - - struct EntryData { - static constexpr auto kType = Handle::kEntry; - - EntryData(NT_Entry handle, SubscriberData* subscriber) - : handle{handle}, topic{subscriber->topic}, subscriber{subscriber} {} - - // invariants - wpi::SignalObject handle; - TopicData* topic; - SubscriberData* subscriber; - - // the publisher (created on demand) - PublisherData* publisher{nullptr}; - }; - - struct MultiSubscriberData { - static constexpr auto kType = Handle::kMultiSubscriber; - - MultiSubscriberData(NT_MultiSubscriber handle, - std::span prefixes, - const PubSubOptionsImpl& options) - : handle{handle}, options{options} { - this->options.prefixMatch = true; - this->prefixes.reserve(prefixes.size()); - for (auto&& prefix : prefixes) { - this->prefixes.emplace_back(prefix); - } - } - - bool Matches(std::string_view name, bool special); - - // invariants - wpi::SignalObject handle; - std::vector prefixes; - PubSubOptionsImpl options; - - // value listeners - VectorSet valueListeners; - }; - - struct ListenerData { - ListenerData(NT_Listener handle, SubscriberData* subscriber, - unsigned int eventMask, bool subscriberOwned) - : handle{handle}, - eventMask{eventMask}, - subscriber{subscriber}, - subscriberOwned{subscriberOwned} {} - ListenerData(NT_Listener handle, MultiSubscriberData* subscriber, - unsigned int eventMask, bool subscriberOwned) - : handle{handle}, - eventMask{eventMask}, - multiSubscriber{subscriber}, - subscriberOwned{subscriberOwned} {} - - NT_Listener handle; - unsigned int eventMask; - SubscriberData* subscriber{nullptr}; - MultiSubscriberData* multiSubscriber{nullptr}; - bool subscriberOwned; - }; - - struct DataLoggerData { - static constexpr auto kType = Handle::kDataLogger; - - DataLoggerData(NT_DataLogger handle, wpi::log::DataLog& log, - std::string_view prefix, std::string_view logPrefix) - : handle{handle}, log{log}, prefix{prefix}, logPrefix{logPrefix} {} - - int Start(TopicData* topic, int64_t time); - - NT_DataLogger handle; - wpi::log::DataLog& log; - std::string prefix; - std::string logPrefix; - }; - - // inner struct to protect against accidentally deadlocking on the mutex - struct Impl { - Impl(int inst, IListenerStorage& listenerStorage, wpi::Logger& logger); - - int m_inst; - IListenerStorage& m_listenerStorage; - wpi::Logger& m_logger; - net::ClientMessageHandler* m_network{nullptr}; - - // handle mappings - HandleMap m_topics; - HandleMap m_publishers; - HandleMap m_subscribers; - HandleMap m_entries; - HandleMap m_multiSubscribers; - HandleMap m_dataloggers; - - // name mappings - wpi::StringMap m_nameTopics; - - // listeners - wpi::DenseMap> m_listeners; - - // string-based listeners - VectorSet m_topicPrefixListeners; - - // schema publishers - wpi::StringMap m_schemas; - - // topic functions - void NotifyTopic(TopicData* topic, unsigned int eventFlags); - - void CheckReset(TopicData* topic); - - bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags, - bool suppressIfDuplicate, const PublisherData* publisher); - void NotifyValue(TopicData* topic, const Value& value, - unsigned int eventFlags, bool isDuplicate, - const PublisherData* publisher); - - void SetFlags(TopicData* topic, unsigned int flags); - void SetPersistent(TopicData* topic, bool value); - void SetRetained(TopicData* topic, bool value); - void SetCached(TopicData* topic, bool value); - void SetProperties(TopicData* topic, const wpi::json& update, - bool sendNetwork); - void PropertiesUpdated(TopicData* topic, const wpi::json& update, - unsigned int eventFlags, bool sendNetwork, - bool updateFlags = true); - - void RefreshPubSubActive(TopicData* topic, bool warnOnSubMismatch); - - void NetworkAnnounce(TopicData* topic, std::string_view typeStr, - const wpi::json& properties, - std::optional pubuid); - void RemoveNetworkPublisher(TopicData* topic); - void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack); - void StartNetwork(net::ClientMessageHandler* network); - - PublisherData* AddLocalPublisher(TopicData* topic, - const wpi::json& properties, - const PubSubConfig& options); - std::unique_ptr RemoveLocalPublisher(NT_Publisher pubHandle); - - SubscriberData* AddLocalSubscriber(TopicData* topic, - const PubSubConfig& options); - std::unique_ptr RemoveLocalSubscriber( - NT_Subscriber subHandle); - - EntryData* AddEntry(SubscriberData* subscriber); - std::unique_ptr RemoveEntry(NT_Entry entryHandle); - - MultiSubscriberData* AddMultiSubscriber( - std::span prefixes, - const PubSubOptions& options); - std::unique_ptr RemoveMultiSubscriber( - NT_MultiSubscriber subHandle); - - void AddListenerImpl(NT_Listener listenerHandle, TopicData* topic, - unsigned int eventMask); - void AddListenerImpl(NT_Listener listenerHandle, SubscriberData* subscriber, - unsigned int eventMask, NT_Handle subentryHandle, - bool subscriberOwned); - void AddListenerImpl(NT_Listener listenerHandle, - MultiSubscriberData* subscriber, - unsigned int eventMask, bool subscriberOwned); - void AddListenerImpl(NT_Listener listenerHandle, - std::span prefixes, - unsigned int eventMask); - - TopicData* GetOrCreateTopic(std::string_view name); - TopicData* GetTopic(NT_Handle handle); - SubscriberData* GetSubEntry(NT_Handle subentryHandle); - PublisherData* PublishEntry(EntryData* entry, NT_Type type); - - Value* GetSubEntryValue(NT_Handle subentryHandle) { - if (auto subscriber = GetSubEntry(subentryHandle)) { - return &subscriber->topic->lastValue; - } else { - return nullptr; - } - } - - bool PublishLocalValue(PublisherData* publisher, const Value& value, - bool force = false); - - bool SetEntryValue(NT_Handle pubentryHandle, const Value& value); - bool SetDefaultEntryValue(NT_Handle pubsubentryHandle, const Value& value); - - void RemoveSubEntry(NT_Handle subentryHandle); - }; - + void AddSchema(std::string_view name, std::string_view type, + std::span schema) { + std::scoped_lock lock{m_mutex}; + m_impl.AddSchema(name, type, schema); + } + + void Reset() { + std::scoped_lock lock{m_mutex}; + m_impl.Reset(); + } + + private: wpi::mutex m_mutex; - Impl m_impl; + local::StorageImpl m_impl; }; } // namespace nt diff --git a/ntcore/src/main/native/cpp/local/LocalDataLogger.cpp b/ntcore/src/main/native/cpp/local/LocalDataLogger.cpp new file mode 100644 index 0000000000..121c99ab39 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalDataLogger.cpp @@ -0,0 +1,24 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "LocalDataLogger.h" + +#include +#include +#include + +using namespace nt::local; + +int LocalDataLogger::Start(std::string_view name, std::string_view typeStr, + std::string_view metadata, int64_t time) { + // NT and DataLog use different standard representations for int and int[] + if (typeStr == "int") { + typeStr = "int64"; + } else if (typeStr == "int[]") { + typeStr = "int64[]"; + } + return log.Start(fmt::format("{}{}", logPrefix, + wpi::remove_prefix(name, prefix).value_or(name)), + typeStr, metadata, time); +} diff --git a/ntcore/src/main/native/cpp/local/LocalDataLogger.h b/ntcore/src/main/native/cpp/local/LocalDataLogger.h new file mode 100644 index 0000000000..c92e9ce337 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalDataLogger.h @@ -0,0 +1,39 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include + +#include "Handle.h" +#include "ntcore_c.h" + +namespace wpi::log { +class DataLog; +} // namespace wpi::log + +namespace nt::local { + +struct LocalTopic; + +struct LocalDataLogger { + static constexpr auto kType = Handle::kDataLogger; + + LocalDataLogger(NT_DataLogger handle, wpi::log::DataLog& log, + std::string_view prefix, std::string_view logPrefix) + : handle{handle}, log{log}, prefix{prefix}, logPrefix{logPrefix} {} + + int Start(std::string_view name, std::string_view typeStr, + std::string_view metadata, int64_t time); + + NT_DataLogger handle; + wpi::log::DataLog& log; + std::string prefix; + std::string logPrefix; +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/LocalDataLoggerEntry.cpp b/ntcore/src/main/native/cpp/local/LocalDataLoggerEntry.cpp new file mode 100644 index 0000000000..bd0fb47aed --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalDataLoggerEntry.cpp @@ -0,0 +1,64 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "LocalDataLoggerEntry.h" + +#include +#include + +#include +#include + +#include "networktables/NetworkTableValue.h" + +using namespace nt::local; + +std::string LocalDataLoggerEntry::MakeMetadata(std::string_view properties) { + return fmt::format("{{\"properties\":{},\"source\":\"NT\"}}", properties); +} + +void LocalDataLoggerEntry::Append(const Value& v) { + auto time = v.time(); + switch (v.type()) { + case NT_BOOLEAN: + log->AppendBoolean(entry, v.GetBoolean(), time); + break; + case NT_INTEGER: + log->AppendInteger(entry, v.GetInteger(), time); + break; + case NT_FLOAT: + log->AppendFloat(entry, v.GetFloat(), time); + break; + case NT_DOUBLE: + log->AppendDouble(entry, v.GetDouble(), time); + break; + case NT_STRING: + log->AppendString(entry, v.GetString(), time); + break; + case NT_RAW: { + auto val = v.GetRaw(); + log->AppendRaw(entry, + {reinterpret_cast(val.data()), val.size()}, + time); + break; + } + case NT_BOOLEAN_ARRAY: + log->AppendBooleanArray(entry, v.GetBooleanArray(), time); + break; + case NT_INTEGER_ARRAY: + log->AppendIntegerArray(entry, v.GetIntegerArray(), time); + break; + case NT_FLOAT_ARRAY: + log->AppendFloatArray(entry, v.GetFloatArray(), time); + break; + case NT_DOUBLE_ARRAY: + log->AppendDoubleArray(entry, v.GetDoubleArray(), time); + break; + case NT_STRING_ARRAY: + log->AppendStringArray(entry, v.GetStringArray(), time); + break; + default: + break; + } +} diff --git a/ntcore/src/main/native/cpp/local/LocalDataLoggerEntry.h b/ntcore/src/main/native/cpp/local/LocalDataLoggerEntry.h new file mode 100644 index 0000000000..53ff480b19 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalDataLoggerEntry.h @@ -0,0 +1,45 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include + +#include + +#include "ntcore_c.h" + +namespace wpi::log { +class DataLog; +} // namespace wpi::log + +namespace nt { +class Value; +} // namespace nt + +namespace nt::local { + +struct LocalTopic; + +struct LocalDataLoggerEntry { + LocalDataLoggerEntry(wpi::log::DataLog& log, int entry, NT_DataLogger logger) + : log{&log}, entry{entry}, logger{logger} {} + + static std::string MakeMetadata(std::string_view properties); + + void Append(const Value& v); + void Finish(int64_t timestamp) { log->Finish(entry, timestamp); } + void SetMetadata(std::string_view metadata, int64_t timestamp) { + log->SetMetadata(entry, metadata, timestamp); + } + + wpi::log::DataLog* log; + int entry; + NT_DataLogger logger; +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/LocalEntry.h b/ntcore/src/main/native/cpp/local/LocalEntry.h new file mode 100644 index 0000000000..01f43d475a --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalEntry.h @@ -0,0 +1,31 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include "Handle.h" +#include "local/LocalSubscriber.h" + +namespace nt::local { + +struct LocalPublisher; + +struct LocalEntry { + static constexpr auto kType = Handle::kEntry; + + LocalEntry(NT_Entry handle, LocalSubscriber* subscriber) + : handle{handle}, topic{subscriber->topic}, subscriber{subscriber} {} + + // invariants + wpi::SignalObject handle; + LocalTopic* topic; + LocalSubscriber* subscriber; + + // the publisher (created on demand) + LocalPublisher* publisher{nullptr}; +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/LocalListener.h b/ntcore/src/main/native/cpp/local/LocalListener.h new file mode 100644 index 0000000000..89b21fcc15 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalListener.h @@ -0,0 +1,35 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include "ntcore_c.h" + +namespace nt::local { + +struct LocalMultiSubscriber; +struct LocalSubscriber; + +struct LocalListener { + LocalListener(NT_Listener handle, LocalSubscriber* subscriber, + unsigned int eventMask, bool subscriberOwned) + : handle{handle}, + eventMask{eventMask}, + subscriber{subscriber}, + subscriberOwned{subscriberOwned} {} + LocalListener(NT_Listener handle, LocalMultiSubscriber* subscriber, + unsigned int eventMask, bool subscriberOwned) + : handle{handle}, + eventMask{eventMask}, + multiSubscriber{subscriber}, + subscriberOwned{subscriberOwned} {} + + NT_Listener handle; + unsigned int eventMask; + LocalSubscriber* subscriber{nullptr}; + LocalMultiSubscriber* multiSubscriber{nullptr}; + bool subscriberOwned; +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/LocalMultiSubscriber.h b/ntcore/src/main/native/cpp/local/LocalMultiSubscriber.h new file mode 100644 index 0000000000..8962759917 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalMultiSubscriber.h @@ -0,0 +1,59 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "Handle.h" +#include "PubSubOptions.h" +#include "VectorSet.h" +#include "ntcore_c.h" + +namespace nt::local { + +constexpr bool PrefixMatch(std::string_view name, std::string_view prefix, + bool special) { + return (!special || !prefix.empty()) && wpi::starts_with(name, prefix); +} + +struct LocalMultiSubscriber { + static constexpr auto kType = Handle::kMultiSubscriber; + + LocalMultiSubscriber(NT_MultiSubscriber handle, + std::span prefixes, + const PubSubOptionsImpl& options) + : handle{handle}, options{options} { + this->options.prefixMatch = true; + this->prefixes.reserve(prefixes.size()); + for (auto&& prefix : prefixes) { + this->prefixes.emplace_back(prefix); + } + } + + bool Matches(std::string_view name, bool special) { + for (auto&& prefix : prefixes) { + if (PrefixMatch(name, prefix, special)) { + return true; + } + } + return false; + } + + // invariants + wpi::SignalObject handle; + std::vector prefixes; + PubSubOptionsImpl options; + + // value listeners + VectorSet valueListeners; +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/LocalPublisher.h b/ntcore/src/main/native/cpp/local/LocalPublisher.h new file mode 100644 index 0000000000..611c038735 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalPublisher.h @@ -0,0 +1,36 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include + +#include "Handle.h" +#include "local/LocalTopic.h" +#include "local/PubSubConfig.h" + +namespace nt::local { + +struct LocalPublisher { + static constexpr auto kType = Handle::kPublisher; + + LocalPublisher(NT_Publisher handle, LocalTopic* topic, PubSubConfig config) + : handle{handle}, topic{topic}, config{std::move(config)} {} + + void UpdateActive() { + active = config.type == topic->type && config.typeStr == topic->typeStr; + } + + // invariants + wpi::SignalObject handle; + LocalTopic* topic; + PubSubConfig config; + + // whether or not the publisher should actually publish values + bool active{false}; +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/LocalStorageImpl.cpp b/ntcore/src/main/native/cpp/local/LocalStorageImpl.cpp new file mode 100644 index 0000000000..60a03c1c77 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalStorageImpl.cpp @@ -0,0 +1,1100 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "LocalStorageImpl.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "IListenerStorage.h" +#include "Log.h" +#include "net/MessageHandler.h" + +using namespace nt; +using namespace nt::local; + +// maximum number of local publishers / subscribers to any given topic +static constexpr size_t kMaxPublishers = 512; +static constexpr size_t kMaxSubscribers = 512; +static constexpr size_t kMaxMultiSubscribers = 512; +static constexpr size_t kMaxListeners = 512; + +StorageImpl::StorageImpl(int inst, IListenerStorage& listenerStorage, + wpi::Logger& logger) + : m_inst{inst}, m_listenerStorage{listenerStorage}, m_logger{logger} {} + +// +// Network interface functions +// + +void StorageImpl::NetworkAnnounce(LocalTopic* topic, std::string_view typeStr, + const wpi::json& properties, + std::optional pubuid) { + DEBUG4("LS NetworkAnnounce({}, {}, {}, {})", topic->name, typeStr, + properties.dump(), pubuid.value_or(-1)); + if (pubuid.has_value()) { + return; // ack of our publish; ignore + } + + unsigned int event = NT_EVENT_NONE; + // fresh non-local publish; the network publish always sets the type even + // if it was locally published, but output a diagnostic for this case + bool didExist = topic->Exists(); + topic->onNetwork = true; + NT_Type type = StringToType(typeStr); + if (topic->type != type || topic->typeStr != typeStr) { + if (didExist) { + INFO( + "network announce of '{}' overriding local publish (was '{}', now " + "'{}')", + topic->name, topic->typeStr, typeStr); + } + topic->type = type; + topic->typeStr = typeStr; + RefreshPubSubActive(topic, true); + } + if (!didExist) { + event |= NT_EVENT_PUBLISH; + } + + // may be properties update, but need to compare to see if it actually + // changed to determine whether to update string / send event + wpi::json update = topic->CompareProperties(properties); + if (!update.empty()) { + topic->properties = properties; + PropertiesUpdated(topic, update, event, false); + } else if (event != NT_EVENT_NONE) { + NotifyTopic(topic, event); + } +} + +void StorageImpl::RemoveNetworkPublisher(LocalTopic* topic) { + DEBUG4("LS RemoveNetworkPublisher({}, {})", topic->handle.GetHandle(), + topic->name); + // this acts as an unpublish + bool didExist = topic->Exists(); + topic->onNetwork = false; + if (didExist && !topic->Exists()) { + DEBUG4("Unpublished {}", topic->name); + topic->ResetIfDoesNotExist(); + NotifyTopic(topic, NT_EVENT_UNPUBLISH); + } + + if (!topic->localPublishers.empty()) { + // some other publisher still exists; if it has a different type, refresh + // and publish it over the network + auto& nextPub = topic->localPublishers.front(); + if (nextPub->config.type != topic->type || + nextPub->config.typeStr != topic->typeStr) { + topic->type = nextPub->config.type; + topic->typeStr = nextPub->config.typeStr; + RefreshPubSubActive(topic, false); + // this may result in a duplicate publish warning on the server side, + // but send one anyway in this case just to be sure + if (nextPub->active && m_network) { + m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(), + topic->name, topic->typeStr, topic->properties, + nextPub->config); + } + } + } +} + +void StorageImpl::NetworkPropertiesUpdate(LocalTopic* topic, + const wpi::json& update, bool ack) { + DEBUG4("NetworkPropertiesUpdate({},{})", topic->name, ack); + if (ack) { + return; // ignore acks + } + SetProperties(topic, update, false); +} + +void StorageImpl::StartNetwork(net::ClientMessageHandler* network) { + DEBUG4("StartNetwork()"); + m_network = network; + // publish all active publishers to the network and send last values + // only send value once per topic + for (auto&& topic : m_topics) { + LocalPublisher* anyPublisher = nullptr; + for (auto&& publisher : topic->localPublishers) { + if (publisher->active) { + network->ClientPublish(Handle{publisher->handle}.GetIndex(), + topic->name, topic->typeStr, topic->properties, + publisher->config); + anyPublisher = publisher; + } + } + if (anyPublisher && topic->lastValue) { + network->ClientSetValue(Handle{anyPublisher->handle}.GetIndex(), + topic->lastValue); + } + } + for (auto&& subscriber : m_subscribers) { + if (!subscriber->config.hidden) { + network->ClientSubscribe(1 + Handle{subscriber->handle}.GetIndex(), + {{subscriber->topic->name}}, subscriber->config); + } + } + for (auto&& subscriber : m_multiSubscribers) { + if (!subscriber->options.hidden) { + network->ClientSubscribe(-1 - Handle{subscriber->handle}.GetIndex(), + subscriber->prefixes, subscriber->options); + } + } +} + +void StorageImpl::ClearNetwork() { + DEBUG4("ClearNetwork()"); + m_network = nullptr; + // treat as an unannounce all from the network side + for (auto&& topic : m_topics) { + RemoveNetworkPublisher(topic.get()); + } +} + +// +// Topic functions +// + +LocalTopic* StorageImpl::GetOrCreateTopic(std::string_view name) { + auto& topic = m_nameTopics[name]; + // create if it does not already exist + if (!topic) { + topic = m_topics.Add(m_inst, name); + // attach multi-subscribers + for (auto&& sub : m_multiSubscribers) { + if (sub->Matches(name, topic->special)) { + topic->multiSubscribers.Add(sub.get()); + } + } + } + return topic; +} + +// +// Topic property functions +// + +void StorageImpl::SetFlags(LocalTopic* topic, unsigned int flags) { + wpi::json update = topic->SetFlags(flags); + if ((flags & NT_UNCACHED) != 0 && (flags & NT_PERSISTENT) != 0) { + WARN("topic {}: disabling cached property disables persistent storage", + topic->name); + } + if (!update.empty()) { + PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false); + } +} + +void StorageImpl::SetPersistent(LocalTopic* topic, bool value) { + PropertiesUpdated(topic, topic->SetPersistent(value), NT_EVENT_NONE, true, + false); +} + +void StorageImpl::SetRetained(LocalTopic* topic, bool value) { + PropertiesUpdated(topic, topic->SetRetained(value), NT_EVENT_NONE, true, + false); +} + +void StorageImpl::SetCached(LocalTopic* topic, bool value) { + PropertiesUpdated(topic, topic->SetCached(value), NT_EVENT_NONE, true, false); +} + +void StorageImpl::SetProperty(LocalTopic* topic, std::string_view name, + const wpi::json& value) { + PropertiesUpdated(topic, topic->SetProperty(name, value), NT_EVENT_NONE, + true); +} + +bool StorageImpl::SetProperties(LocalTopic* topic, const wpi::json& update, + bool sendNetwork) { + DEBUG4("SetProperties({},{})", topic->name, sendNetwork); + if (!topic->SetProperties(update)) { + return false; + } + PropertiesUpdated(topic, update, NT_EVENT_NONE, sendNetwork); + return true; +} + +void StorageImpl::DeleteProperty(LocalTopic* topic, std::string_view name) { + PropertiesUpdated(topic, topic->DeleteProperty(name), NT_EVENT_NONE, true); +} + +// +// Value functions +// + +bool StorageImpl::SetEntryValue(NT_Handle pubentryHandle, const Value& value) { + if (!value) { + return false; + } + auto publisher = m_publishers.Get(pubentryHandle); + if (!publisher) { + if (auto entry = m_entries.Get(pubentryHandle)) { + publisher = PublishEntry(entry, value.type()); + } + if (!publisher) { + return false; + } + } + return PublishLocalValue(publisher, value); +} + +bool StorageImpl::SetDefaultEntryValue(NT_Handle pubsubentryHandle, + const Value& value) { + DEBUG4("SetDefaultEntryValue({}, {})", pubsubentryHandle, + static_cast(value.type())); + if (!value) { + return false; + } + if (auto topic = GetTopic(pubsubentryHandle)) { + if (!topic->IsCached()) { + WARN("ignoring default value on non-cached topic '{}'", topic->name); + return false; + } + if (!topic->lastValue && + (topic->type == NT_UNASSIGNED || topic->type == value.type() || + IsNumericCompatible(topic->type, value.type()))) { + // publish if we haven't yet + auto publisher = m_publishers.Get(pubsubentryHandle); + if (!publisher) { + if (auto entry = m_entries.Get(pubsubentryHandle)) { + publisher = PublishEntry(entry, value.type()); + } + } + + // force value timestamps to 0 + if (topic->type == NT_UNASSIGNED) { + topic->type = value.type(); + } + Value newValue; + if (topic->type == value.type()) { + newValue = value; + } else if (IsNumericCompatible(topic->type, value.type())) { + newValue = ConvertNumericValue(value, topic->type); + } else { + return true; + } + newValue.SetTime(0); + newValue.SetServerTime(0); + if (publisher) { + PublishLocalValue(publisher, newValue, true); + } else { + topic->lastValue = newValue; + } + return true; + } + } + return false; +} + +// +// Publish/Subscribe/Entry functions +// + +LocalSubscriber* StorageImpl::Subscribe(LocalTopic* topic, NT_Type type, + std::string_view typeStr, + const PubSubOptions& options) { + if (topic->localSubscribers.size() >= kMaxSubscribers) { + ERR("reached maximum number of subscribers to '{}', not subscribing", + topic->name); + return nullptr; + } + + // Create subscriber + return AddLocalSubscriber(topic, PubSubConfig{type, typeStr, options}); +} + +LocalPublisher* StorageImpl::Publish(LocalTopic* topic, NT_Type type, + std::string_view typeStr, + const wpi::json& properties, + const PubSubOptions& options) { + if (type == NT_UNASSIGNED || typeStr.empty()) { + ERR("cannot publish '{}' with an unassigned type or empty type string", + topic->name); + return nullptr; + } + + if (topic->localPublishers.size() >= kMaxPublishers) { + ERR("reached maximum number of publishers to '{}', not publishing", + topic->name); + return nullptr; + } + + return AddLocalPublisher(topic, properties, + PubSubConfig{type, typeStr, options}); +} + +LocalEntry* StorageImpl::GetEntry(LocalTopic* topic, NT_Type type, + std::string_view typeStr, + const PubSubOptions& options) { + if (topic->localSubscribers.size() >= kMaxSubscribers) { + ERR("reached maximum number of subscribers to '{}', not creating entry", + topic->name); + return nullptr; + } + + // Create subscriber + auto subscriber = + AddLocalSubscriber(topic, PubSubConfig{type, typeStr, options}); + + // Create entry + return AddEntry(subscriber); +} + +LocalEntry* StorageImpl::GetEntry(std::string_view name) { + if (name.empty()) { + return nullptr; + } + + // Get the topic data + auto* topic = GetOrCreateTopic(name); + + if (!topic->entry) { + if (topic->localSubscribers.size() >= kMaxSubscribers) { + ERR("reached maximum number of subscribers to '{}', not creating entry", + topic->name); + return nullptr; + } + + // Create subscriber + auto* subscriber = AddLocalSubscriber(topic, {}); + + // Create entry + topic->entry = AddEntry(subscriber); + } + + return topic->entry; +} + +void StorageImpl::RemoveSubEntry(NT_Handle subentryHandle) { + Handle h{subentryHandle}; + if (h.IsType(Handle::kSubscriber)) { + RemoveLocalSubscriber(subentryHandle); + } else if (h.IsType(Handle::kMultiSubscriber)) { + RemoveMultiSubscriber(subentryHandle); + } else if (h.IsType(Handle::kEntry)) { + if (auto entry = RemoveEntry(subentryHandle)) { + RemoveLocalSubscriber(entry->subscriber->handle); + if (entry->publisher) { + RemoveLocalPublisher(entry->publisher->handle); + } + } + } +} + +void StorageImpl::Unpublish(NT_Handle pubentryHandle) { + if (Handle{pubentryHandle}.IsType(Handle::kPublisher)) { + RemoveLocalPublisher(pubentryHandle); + } else if (auto entry = GetEntryByHandle(pubentryHandle)) { + if (entry->publisher) { + RemoveLocalPublisher(entry->publisher->handle); + entry->publisher = nullptr; + } + } else { + // TODO: report warning + return; + } +} + +// +// Multi-subscriber functions +// + +LocalMultiSubscriber* StorageImpl::AddMultiSubscriber( + std::span prefixes, const PubSubOptions& options) { + DEBUG4("AddMultiSubscriber({})", fmt::join(prefixes, ",")); + if (m_multiSubscribers.size() >= kMaxMultiSubscribers) { + ERR("reached maximum number of multi-subscribers, not subscribing"); + return nullptr; + } + auto subscriber = m_multiSubscribers.Add(m_inst, prefixes, options); + // subscribe to any already existing topics + for (auto&& topic : m_topics) { + for (auto&& prefix : prefixes) { + if (PrefixMatch(topic->name, prefix, topic->special)) { + topic->multiSubscribers.Add(subscriber); + break; + } + } + } + if (m_network && !subscriber->options.hidden) { + DEBUG4("-> NetworkSubscribe"); + m_network->ClientSubscribe(-1 - Handle{subscriber->handle}.GetIndex(), + subscriber->prefixes, subscriber->options); + } + return subscriber; +} + +std::unique_ptr StorageImpl::RemoveMultiSubscriber( + NT_MultiSubscriber subHandle) { + auto subscriber = m_multiSubscribers.Remove(subHandle); + if (subscriber) { + for (auto&& topic : m_topics) { + topic->multiSubscribers.Remove(subscriber.get()); + } + for (auto&& listener : m_listeners) { + if (listener.getSecond()->multiSubscriber == subscriber.get()) { + listener.getSecond()->multiSubscriber = nullptr; + } + } + if (m_network && !subscriber->options.hidden) { + m_network->ClientUnsubscribe(-1 - Handle{subscriber->handle}.GetIndex()); + } + } + return subscriber; +} + +// +// Lookup functions +// + +LocalTopic* StorageImpl::GetTopic(NT_Handle handle) { + switch (Handle{handle}.GetType()) { + case Handle::kEntry: { + if (auto entry = m_entries.Get(handle)) { + return entry->topic; + } + break; + } + case Handle::kSubscriber: { + if (auto subscriber = m_subscribers.Get(handle)) { + return subscriber->topic; + } + break; + } + case Handle::kPublisher: { + if (auto publisher = m_publishers.Get(handle)) { + return publisher->topic; + } + break; + } + case Handle::kTopic: + return m_topics.Get(handle); + default: + break; + } + return {}; +} + +LocalSubscriber* StorageImpl::GetSubEntry(NT_Handle subentryHandle) { + Handle h{subentryHandle}; + if (h.IsType(Handle::kSubscriber)) { + return m_subscribers.Get(subentryHandle); + } else if (h.IsType(Handle::kEntry)) { + auto entry = m_entries.Get(subentryHandle); + return entry ? entry->subscriber : nullptr; + } else { + return nullptr; + } +} + +// +// Listener functions +// + +void StorageImpl::AddListenerImpl(NT_Listener listenerHandle, LocalTopic* topic, + unsigned int eventMask) { + if (topic->localSubscribers.size() >= kMaxSubscribers) { + ERR("reached maximum number of subscribers to '{}', ignoring listener add", + topic->name); + return; + } + // subscribe to make sure topic updates are received + PubSubConfig config; + config.topicsOnly = (eventMask & NT_EVENT_VALUE_ALL) == 0; + auto sub = AddLocalSubscriber(topic, config); + AddListenerImpl(listenerHandle, sub, eventMask, sub->handle, true); +} + +void StorageImpl::AddListenerImpl(NT_Listener listenerHandle, + LocalSubscriber* subscriber, + unsigned int eventMask, + NT_Handle subentryHandle, + bool subscriberOwned) { + m_listeners.try_emplace(listenerHandle, std::make_unique( + listenerHandle, subscriber, + eventMask, subscriberOwned)); + + auto topic = subscriber->topic; + + if ((eventMask & NT_EVENT_TOPIC) != 0) { + if (topic->listeners.size() >= kMaxListeners) { + ERR("reached maximum number of listeners to '{}', not adding listener", + topic->name); + return; + } + + m_listenerStorage.Activate( + listenerHandle, eventMask & (NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE)); + + topic->listeners.Add(listenerHandle); + + // handle immediate publish + if ((eventMask & (NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE)) == + (NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE) && + topic->Exists()) { + m_listenerStorage.Notify({&listenerHandle, 1}, + NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE, + topic->GetTopicInfo()); + } + } + + if ((eventMask & NT_EVENT_VALUE_ALL) != 0) { + if (subscriber->valueListeners.size() >= kMaxListeners) { + ERR("reached maximum number of listeners to '{}', not adding listener", + topic->name); + return; + } + m_listenerStorage.Activate( + listenerHandle, eventMask & (NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE), + [subentryHandle](unsigned int mask, Event* event) { + if (auto valueData = event->GetValueEventData()) { + valueData->subentry = subentryHandle; + } + return true; + }); + + subscriber->valueListeners.Add(listenerHandle); + + // handle immediate value + if ((eventMask & NT_EVENT_VALUE_ALL) != 0 && + (eventMask & NT_EVENT_IMMEDIATE) != 0 && topic->lastValue) { + m_listenerStorage.Notify({&listenerHandle, 1}, + NT_EVENT_IMMEDIATE | NT_EVENT_VALUE_ALL, + topic->handle, subentryHandle, topic->lastValue); + } + } +} + +void StorageImpl::AddListenerImpl(NT_Listener listenerHandle, + LocalMultiSubscriber* subscriber, + unsigned int eventMask, + bool subscriberOwned) { + auto listener = + m_listeners + .try_emplace(listenerHandle, std::make_unique( + listenerHandle, subscriber, + eventMask, subscriberOwned)) + .first->getSecond() + .get(); + + // if we're doing anything immediate, get the list of matching topics + wpi::SmallVector topics; + if ((eventMask & NT_EVENT_IMMEDIATE) != 0 && + (eventMask & (NT_EVENT_PUBLISH | NT_EVENT_VALUE_ALL)) != 0) { + for (auto&& topic : m_topics) { + if (topic->Exists() && subscriber->Matches(topic->name, topic->special)) { + topics.emplace_back(topic.get()); + } + } + } + + if ((eventMask & NT_EVENT_TOPIC) != 0) { + if (m_topicPrefixListeners.size() >= kMaxListeners) { + ERR("reached maximum number of listeners, not adding listener"); + return; + } + + m_listenerStorage.Activate( + listenerHandle, eventMask & (NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE)); + + m_topicPrefixListeners.Add(listener); + + // handle immediate publish + if ((eventMask & (NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE)) == + (NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE)) { + std::vector topicInfos; + for (auto&& topic : topics) { + topicInfos.emplace_back(topic->GetTopicInfo()); + } + if (!topicInfos.empty()) { + m_listenerStorage.Notify({&listenerHandle, 1}, + NT_EVENT_PUBLISH | NT_EVENT_IMMEDIATE, + topicInfos); + } + } + } + + if ((eventMask & NT_EVENT_VALUE_ALL) != 0) { + if (subscriber->valueListeners.size() >= kMaxListeners) { + ERR("reached maximum number of listeners, not adding listener"); + return; + } + + m_listenerStorage.Activate( + listenerHandle, eventMask & (NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE), + [subentryHandle = subscriber->handle.GetHandle()](unsigned int mask, + Event* event) { + if (auto valueData = event->GetValueEventData()) { + valueData->subentry = subentryHandle; + } + return true; + }); + + subscriber->valueListeners.Add(listenerHandle); + + // handle immediate value + if ((eventMask & NT_EVENT_VALUE_ALL) != 0 && + (eventMask & NT_EVENT_IMMEDIATE) != 0) { + for (auto&& topic : topics) { + if (topic->lastValue) { + m_listenerStorage.Notify( + {&listenerHandle, 1}, NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE, + topic->handle, subscriber->handle, topic->lastValue); + } + } + } + } +} + +void StorageImpl::RemoveListener(NT_Listener listenerHandle, + unsigned int mask) { + auto listenerIt = m_listeners.find(listenerHandle); + if (listenerIt == m_listeners.end()) { + return; + } + auto listener = std::move(listenerIt->getSecond()); + m_listeners.erase(listenerIt); + if (!listener) { + return; + } + + m_topicPrefixListeners.Remove(listener.get()); + if (listener->subscriber) { + listener->subscriber->valueListeners.Remove(listenerHandle); + listener->subscriber->topic->listeners.Remove(listenerHandle); + if (listener->subscriberOwned) { + RemoveLocalSubscriber(listener->subscriber->handle); + } + } + if (listener->multiSubscriber) { + listener->multiSubscriber->valueListeners.Remove(listenerHandle); + if (listener->subscriberOwned) { + RemoveMultiSubscriber(listener->multiSubscriber->handle); + } + } +} + +// +// Data log functions +// + +LocalDataLogger* StorageImpl::StartDataLog(wpi::log::DataLog& log, + std::string_view prefix, + std::string_view logPrefix) { + auto datalogger = m_dataloggers.Add(m_inst, log, prefix, logPrefix); + + // start logging any matching topics + auto now = nt::Now(); + for (auto&& topic : m_topics) { + if (!PrefixMatch(topic->name, prefix, topic->special) || + topic->type == NT_UNASSIGNED || topic->typeStr.empty()) { + continue; + } + topic->StartStopDataLog(datalogger, now, true); + // log current value, if any + if (topic->lastValue) { + topic->datalogs.back().Append(topic->lastValue); + } + } + + return datalogger; +} + +void StorageImpl::StopDataLog(NT_DataLogger logger) { + if (auto datalogger = m_dataloggers.Remove(logger)) { + // finish any active entries + auto now = Now(); + for (auto&& topic : m_topics) { + topic->StartStopDataLog(datalogger.get(), now, false); + } + } +} + +// +// Schema functions +// + +bool StorageImpl::HasSchema(std::string_view name) { + wpi::SmallString<128> fullName{"/.schema/"}; + fullName += name; + auto it = m_schemas.find(fullName); + return it != m_schemas.end(); +} + +void StorageImpl::AddSchema(std::string_view name, std::string_view type, + std::span schema) { + wpi::SmallString<128> fullName{"/.schema/"}; + fullName += name; + auto& pubHandle = m_schemas[fullName]; + if (pubHandle != 0) { + return; + } + + auto topic = GetOrCreateTopic(fullName); + + if (topic->localPublishers.size() >= kMaxPublishers) { + ERR("reached maximum number of publishers to '{}', not publishing", + topic->name); + return; + } + + pubHandle = AddLocalPublisher(topic, {{"retained", true}}, + PubSubConfig{NT_RAW, type, {}}) + ->handle; + + SetDefaultEntryValue(pubHandle, Value::MakeRaw(schema)); +} + +void StorageImpl::Reset() { + m_network = nullptr; + m_topics.clear(); + m_publishers.clear(); + m_subscribers.clear(); + m_entries.clear(); + m_multiSubscribers.clear(); + m_dataloggers.clear(); + m_nameTopics.clear(); + m_listeners.clear(); + m_topicPrefixListeners.clear(); +} + +void StorageImpl::NotifyTopic(LocalTopic* topic, unsigned int eventFlags) { + DEBUG4("NotifyTopic({}, {})", topic->name, eventFlags); + auto topicInfo = topic->GetTopicInfo(); + if (!topic->listeners.empty()) { + m_listenerStorage.Notify(topic->listeners, eventFlags, topicInfo); + } + + wpi::SmallVector listeners; + for (auto listener : m_topicPrefixListeners) { + if (listener->multiSubscriber && + listener->multiSubscriber->Matches(topic->name, topic->special)) { + listeners.emplace_back(listener->handle); + } + } + if (!listeners.empty()) { + m_listenerStorage.Notify(listeners, eventFlags, topicInfo); + } + + if ((eventFlags & (NT_EVENT_PUBLISH | NT_EVENT_UNPUBLISH)) != 0) { + if (!m_dataloggers.empty()) { + auto now = Now(); + for (auto&& datalogger : m_dataloggers) { + if (PrefixMatch(topic->name, datalogger->prefix, topic->special)) { + topic->StartStopDataLog(datalogger.get(), now, + (eventFlags & NT_EVENT_PUBLISH) != 0); + } + } + } + } else if ((eventFlags & NT_EVENT_PROPERTIES) != 0) { + topic->UpdateDataLogProperties(); + } +} + +bool StorageImpl::SetValue(LocalTopic* topic, const Value& value, + unsigned int eventFlags, bool suppressIfDuplicate, + const LocalPublisher* publisher) { + const bool isDuplicate = topic->IsCached() && topic->lastValue == value; + DEBUG4("SetValue({}, {}, {}, {})", topic->name, value.time(), eventFlags, + isDuplicate); + if (topic->type != NT_UNASSIGNED && topic->type != value.type()) { + return false; + } + // Make sure value isn't older than last value + if (!topic->lastValue || topic->lastValue.time() == 0 || + value.time() >= topic->lastValue.time()) { + // TODO: notify option even if older value + if (!(suppressIfDuplicate && isDuplicate)) { + topic->type = value.type(); + if (topic->IsCached()) { + topic->lastValue = value; + topic->lastValueFromNetwork = false; + } + NotifyValue(topic, value, eventFlags, isDuplicate, publisher); + if (topic->datalogType == value.type()) { + for (auto&& datalog : topic->datalogs) { + datalog.Append(value); + } + } + } + } + + return true; +} + +void StorageImpl::NotifyValue(LocalTopic* topic, const Value& value, + unsigned int eventFlags, bool isDuplicate, + const LocalPublisher* publisher) { + bool isNetwork = (eventFlags & NT_EVENT_VALUE_REMOTE) != 0; + for (auto&& subscriber : topic->localSubscribers) { + if (subscriber->active && + (subscriber->config.keepDuplicates || !isDuplicate) && + ((isNetwork && !subscriber->config.disableRemote) || + (!isNetwork && !subscriber->config.disableLocal)) && + (!publisher || (publisher && (subscriber->config.excludePublisher != + publisher->handle)))) { + subscriber->pollStorage.emplace_back(value); + subscriber->handle.Set(); + if (!subscriber->valueListeners.empty()) { + m_listenerStorage.Notify(subscriber->valueListeners, eventFlags, + topic->handle, 0, value); + } + } + } + + for (auto&& subscriber : topic->multiSubscribers) { + if (subscriber->options.keepDuplicates || !isDuplicate) { + subscriber->handle.Set(); + if (!subscriber->valueListeners.empty()) { + m_listenerStorage.Notify(subscriber->valueListeners, eventFlags, + topic->handle, 0, value); + } + } + } +} + +void StorageImpl::PropertiesUpdated(LocalTopic* topic, const wpi::json& update, + unsigned int eventFlags, bool sendNetwork, + bool updateFlags) { + DEBUG4("PropertiesUpdated({}, {}, {}, {}, {})", topic->name, update.dump(), + eventFlags, sendNetwork, updateFlags); + topic->RefreshProperties(updateFlags); + unsigned int flags = topic->GetFlags(); + if (updateFlags && (flags & NT_UNCACHED) != 0 && + (flags & NT_PERSISTENT) != 0) { + WARN("topic {}: disabling cached property disables persistent storage", + topic->name); + } + + NotifyTopic(topic, eventFlags | NT_EVENT_PROPERTIES); + // check local flag so we don't echo back received properties changes + if (m_network && sendNetwork) { + m_network->ClientSetProperties(topic->name, update); + } +} + +void StorageImpl::RefreshPubSubActive(LocalTopic* topic, + bool warnOnSubMismatch) { + for (auto&& publisher : topic->localPublishers) { + publisher->UpdateActive(); + } + for (auto&& subscriber : topic->localSubscribers) { + subscriber->UpdateActive(); + if (warnOnSubMismatch && topic->Exists() && !subscriber->active) { + // warn on type mismatch + INFO( + "local subscribe to '{}' disabled due to type mismatch (wanted '{}', " + "published as '{}')", + topic->name, subscriber->config.typeStr, topic->typeStr); + } + } +} + +LocalPublisher* StorageImpl::AddLocalPublisher(LocalTopic* topic, + const wpi::json& properties, + const PubSubConfig& config) { + bool didExist = topic->Exists(); + auto publisher = m_publishers.Add(m_inst, topic, config); + topic->localPublishers.Add(publisher); + + if (!didExist) { + DEBUG4("AddLocalPublisher: setting {} type {} typestr {}", topic->name, + static_cast(config.type), config.typeStr); + // set the type to the published type + topic->type = config.type; + topic->typeStr = config.typeStr; + RefreshPubSubActive(topic, true); + + if (properties.is_null()) { + topic->properties = wpi::json::object(); + } else if (properties.is_object()) { + topic->properties = properties; + } else { + WARN("ignoring non-object properties when publishing '{}'", topic->name); + topic->properties = wpi::json::object(); + } + + if (topic->properties.empty()) { + NotifyTopic(topic, NT_EVENT_PUBLISH); + } else { + PropertiesUpdated(topic, topic->properties, NT_EVENT_PUBLISH, false); + } + } else { + // only need to update just this publisher + publisher->UpdateActive(); + if (!publisher->active) { + // warn on type mismatch + INFO( + "local publish to '{}' disabled due to type mismatch (wanted '{}', " + "currently '{}')", + topic->name, config.typeStr, topic->typeStr); + } + } + + if (publisher->active && m_network) { + m_network->ClientPublish(Handle{publisher->handle}.GetIndex(), topic->name, + topic->typeStr, topic->properties, config); + } + return publisher; +} + +std::unique_ptr StorageImpl::RemoveLocalPublisher( + NT_Publisher pubHandle) { + auto publisher = m_publishers.Remove(pubHandle); + if (publisher) { + auto topic = publisher->topic; + bool didExist = topic->Exists(); + topic->localPublishers.Remove(publisher.get()); + if (didExist && !topic->Exists()) { + topic->ResetIfDoesNotExist(); + NotifyTopic(topic, NT_EVENT_UNPUBLISH); + } + + if (publisher->active && m_network) { + m_network->ClientUnpublish(Handle{publisher->handle}.GetIndex()); + } + + if (publisher->active && !topic->localPublishers.empty()) { + // some other publisher still exists; if it has a different type, refresh + // and publish it over the network + auto& nextPub = topic->localPublishers.front(); + if (nextPub->config.type != topic->type || + nextPub->config.typeStr != topic->typeStr) { + topic->type = nextPub->config.type; + topic->typeStr = nextPub->config.typeStr; + RefreshPubSubActive(topic, false); + if (nextPub->active && m_network) { + m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(), + topic->name, topic->typeStr, + topic->properties, nextPub->config); + } + } + } + } + return publisher; +} + +LocalSubscriber* StorageImpl::AddLocalSubscriber(LocalTopic* topic, + const PubSubConfig& config) { + DEBUG4("AddLocalSubscriber({})", topic->name); + auto subscriber = m_subscribers.Add(m_inst, topic, config); + topic->localSubscribers.Add(subscriber); + // set subscriber to active if the type matches + subscriber->UpdateActive(); + if (topic->Exists() && !subscriber->active) { + // warn on type mismatch + INFO( + "local subscribe to '{}' disabled due to type mismatch (wanted '{}', " + "published as '{}')", + topic->name, config.typeStr, topic->typeStr); + } + if (m_network && !subscriber->config.hidden) { + DEBUG4("-> NetworkSubscribe({})", topic->name); + m_network->ClientSubscribe(1 + Handle{subscriber->handle}.GetIndex(), + {{topic->name}}, config); + } + + // queue current value + if (subscriber->active) { + if (!topic->lastValueFromNetwork && !config.disableLocal) { + subscriber->pollStorage.emplace_back(topic->lastValue); + subscriber->handle.Set(); + } else if (topic->lastValueFromNetwork && !config.disableRemote) { + subscriber->pollStorage.emplace_back(topic->lastValueNetwork); + subscriber->handle.Set(); + } + } + return subscriber; +} + +std::unique_ptr StorageImpl::RemoveLocalSubscriber( + NT_Subscriber subHandle) { + auto subscriber = m_subscribers.Remove(subHandle); + if (subscriber) { + auto topic = subscriber->topic; + topic->localSubscribers.Remove(subscriber.get()); + for (auto&& listener : m_listeners) { + if (listener.getSecond()->subscriber == subscriber.get()) { + listener.getSecond()->subscriber = nullptr; + } + } + if (m_network && !subscriber->config.hidden) { + m_network->ClientUnsubscribe(1 + Handle{subscriber->handle}.GetIndex()); + } + } + return subscriber; +} + +LocalPublisher* StorageImpl::PublishEntry(LocalEntry* entry, NT_Type type) { + if (entry->publisher) { + return entry->publisher; + } + if (entry->subscriber->config.type == NT_UNASSIGNED) { + auto typeStr = TypeToString(type); + entry->subscriber->config.type = type; + entry->subscriber->config.typeStr = typeStr; + } else if (entry->subscriber->config.type != type) { + if (!IsNumericCompatible(type, entry->subscriber->config.type)) { + // don't allow dynamically changing the type of an entry + auto typeStr = TypeToString(type); + ERR("cannot publish entry {} as type {}, previously subscribed as {}", + entry->topic->name, typeStr, entry->subscriber->config.typeStr); + return nullptr; + } + } + // create publisher + entry->publisher = AddLocalPublisher(entry->topic, wpi::json::object(), + entry->subscriber->config); + // exclude publisher if requested + if (entry->subscriber->config.excludeSelf) { + entry->subscriber->config.excludePublisher = entry->publisher->handle; + } + return entry->publisher; +} + +bool StorageImpl::PublishLocalValue(LocalPublisher* publisher, + const Value& value, bool force) { + if (!value) { + return false; + } + if (publisher->topic->type != NT_UNASSIGNED && + publisher->topic->type != value.type()) { + if (IsNumericCompatible(publisher->topic->type, value.type())) { + return PublishLocalValue( + publisher, ConvertNumericValue(value, publisher->topic->type)); + } + return false; + } + if (publisher->active) { + bool isNetworkDuplicate, suppressDuplicates; + if (force || publisher->config.keepDuplicates) { + suppressDuplicates = false; + isNetworkDuplicate = false; + } else { + suppressDuplicates = true; + isNetworkDuplicate = publisher->topic->IsCached() && + (publisher->topic->lastValueNetwork == value); + } + if (!isNetworkDuplicate && m_network) { + if (publisher->topic->IsCached()) { + publisher->topic->lastValueNetwork = value; + } + m_network->ClientSetValue(Handle{publisher->handle}.GetIndex(), value); + } + return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, + suppressDuplicates, publisher); + } else { + return false; + } +} diff --git a/ntcore/src/main/native/cpp/local/LocalStorageImpl.h b/ntcore/src/main/native/cpp/local/LocalStorageImpl.h new file mode 100644 index 0000000000..dfa5e77fd4 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalStorageImpl.h @@ -0,0 +1,320 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "HandleMap.h" +#include "local/LocalDataLogger.h" +#include "local/LocalEntry.h" +#include "local/LocalListener.h" +#include "local/LocalMultiSubscriber.h" +#include "local/LocalPublisher.h" +#include "local/LocalSubscriber.h" +#include "local/LocalTopic.h" +#include "ntcore_c.h" +#include "ntcore_cpp.h" + +namespace wpi { +class Logger; +} // namespace wpi + +namespace nt { +class IListenerStorage; +} // namespace nt + +namespace nt::net { +class ClientMessageHandler; +} // namespace nt::net + +namespace nt::local { + +// inner struct to protect against accidentally deadlocking on the mutex +class StorageImpl { + public: + StorageImpl(int inst, IListenerStorage& listenerStorage, wpi::Logger& logger); + + wpi::Logger& GetLogger() { return m_logger; } + + // + // Network interface functions + // + + void NetworkAnnounce(LocalTopic* topic, std::string_view typeStr, + const wpi::json& properties, std::optional pubuid); + void RemoveNetworkPublisher(LocalTopic* topic); + void NetworkPropertiesUpdate(LocalTopic* topic, const wpi::json& update, + bool ack); + + void ServerSetValue(LocalTopic* topic, const Value& value) { + if (SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) { + if (topic->IsCached()) { + topic->lastValueNetwork = value; + topic->lastValueFromNetwork = true; + } + } + } + + void StartNetwork(net::ClientMessageHandler* network); + void ClearNetwork(); + + // + // Topic functions + // + + LocalTopic* GetOrCreateTopic(std::string_view name); + + template F> + void ForEachTopic(std::string_view prefix, unsigned int types, + F&& func) const { + for (auto&& topic : m_topics) { + if (!topic->Exists()) { + continue; + } + if (!wpi::starts_with(topic->name, prefix)) { + continue; + } + if (types != 0 && (types & topic->type) == 0) { + continue; + } + func(*topic); + } + } + + template F> + void ForEachTopic(std::string_view prefix, + std::span types, F&& func) const { + for (auto&& topic : m_topics) { + if (!topic->Exists()) { + continue; + } + if (!wpi::starts_with(topic->name, prefix)) { + continue; + } + if (!types.empty()) { + bool match = false; + for (auto&& type : types) { + if (topic->typeStr == type) { + match = true; + break; + } + } + if (!match) { + continue; + } + } + func(*topic); + } + } + + // + // Topic property functions + // + + void SetFlags(LocalTopic* topic, unsigned int flags); + void SetPersistent(LocalTopic* topic, bool value); + void SetRetained(LocalTopic* topic, bool value); + void SetCached(LocalTopic* topic, bool value); + + void SetProperty(LocalTopic* topic, std::string_view name, + const wpi::json& value); + bool SetProperties(LocalTopic* topic, const wpi::json& update, + bool sendNetwork); + + void DeleteProperty(LocalTopic* topic, std::string_view name); + + // + // Value functions + // + + bool SetEntryValue(NT_Handle pubentryHandle, const Value& value); + bool SetDefaultEntryValue(NT_Handle pubsubentryHandle, const Value& value); + + Value* GetSubEntryValue(NT_Handle subentryHandle) { + if (auto subscriber = GetSubEntry(subentryHandle)) { + return &subscriber->topic->lastValue; + } else { + return nullptr; + } + } + + // + // Publish/Subscribe/Entry functions + // + + LocalSubscriber* Subscribe(LocalTopic* topic, NT_Type type, + std::string_view typeString, + const PubSubOptions& options); + + LocalPublisher* Publish(LocalTopic* topic, NT_Type type, + std::string_view typeStr, const wpi::json& properties, + const PubSubOptions& options); + + LocalEntry* GetEntry(LocalTopic* topicHandle, NT_Type type, + std::string_view typeStr, const PubSubOptions& options); + LocalEntry* GetEntry(std::string_view name); + + void RemoveSubEntry(NT_Handle subentryHandle); + + void Unpublish(NT_Handle pubentryHandle); + + // + // Multi-subscriber functions + // + + LocalMultiSubscriber* AddMultiSubscriber( + std::span prefixes, const PubSubOptions& options); + + std::unique_ptr RemoveMultiSubscriber( + NT_MultiSubscriber subHandle); + + // + // Lookup functions + // + + LocalTopic* GetTopic(NT_Handle handle); + LocalTopic* GetTopicByHandle(NT_Topic topicHandle) { + return m_topics.Get(topicHandle); + } + LocalTopic* GetTopicByName(std::string_view name) { + auto it = m_nameTopics.find(name); + if (it == m_nameTopics.end()) { + return nullptr; + } + return it->second; + } + LocalTopic* GetTopicById(int topicId) { + return m_topics.Get(Handle{m_inst, topicId, Handle::kTopic}); + } + + LocalSubscriber* GetSubEntry(NT_Handle subentryHandle); + + LocalEntry* GetEntryByHandle(NT_Entry entryHandle) { + return m_entries.Get(entryHandle); + } + + LocalMultiSubscriber* GetMultiSubscriberByHandle(NT_MultiSubscriber handle) { + return m_multiSubscribers.Get(handle); + } + + LocalSubscriber* GetSubscriberByHandle(NT_Subscriber handle) { + return m_subscribers.Get(handle); + } + + // + // Listener functions + // + + void AddListenerImpl(NT_Listener listenerHandle, LocalTopic* topic, + unsigned int eventMask); + void AddListenerImpl(NT_Listener listenerHandle, LocalSubscriber* subscriber, + unsigned int eventMask, NT_Handle subentryHandle, + bool subscriberOwned); + void AddListenerImpl(NT_Listener listenerHandle, + LocalMultiSubscriber* subscriber, unsigned int eventMask, + bool subscriberOwned); + void RemoveListener(NT_Listener listener, unsigned int mask); + + // + // Data log functions + // + + LocalDataLogger* StartDataLog(wpi::log::DataLog& log, std::string_view prefix, + std::string_view logPrefix); + void StopDataLog(NT_DataLogger logger); + + // + // Schema functions + // + + bool HasSchema(std::string_view name); + void AddSchema(std::string_view name, std::string_view type, + std::span schema); + + void Reset(); + + private: + // topic functions + void NotifyTopic(LocalTopic* topic, unsigned int eventFlags); + + bool SetValue(LocalTopic* topic, const Value& value, unsigned int eventFlags, + bool suppressIfDuplicate, const LocalPublisher* publisher); + void NotifyValue(LocalTopic* topic, const Value& value, + unsigned int eventFlags, bool isDuplicate, + const LocalPublisher* publisher); + + void PropertiesUpdated(LocalTopic* topic, const wpi::json& update, + unsigned int eventFlags, bool sendNetwork, + bool updateFlags = true); + + void RefreshPubSubActive(LocalTopic* topic, bool warnOnSubMismatch); + + LocalPublisher* AddLocalPublisher(LocalTopic* topic, + const wpi::json& properties, + const PubSubConfig& options); + + std::unique_ptr RemoveLocalPublisher(NT_Publisher pubHandle); + + LocalSubscriber* AddLocalSubscriber(LocalTopic* topic, + const PubSubConfig& options); + + std::unique_ptr RemoveLocalSubscriber( + NT_Subscriber subHandle); + + LocalEntry* AddEntry(LocalSubscriber* subscriber) { + auto entry = m_entries.Add(m_inst, subscriber); + subscriber->topic->entries.Add(entry); + return entry; + } + + std::unique_ptr RemoveEntry(NT_Entry entryHandle) { + auto entry = m_entries.Remove(entryHandle); + if (entry) { + entry->topic->entries.Remove(entry.get()); + } + return entry; + } + + LocalPublisher* PublishEntry(LocalEntry* entry, NT_Type type); + + bool PublishLocalValue(LocalPublisher* publisher, const Value& value, + bool force = false); + + private: + int m_inst; + IListenerStorage& m_listenerStorage; + wpi::Logger& m_logger; + net::ClientMessageHandler* m_network{nullptr}; + + // handle mappings + HandleMap m_topics; + HandleMap m_publishers; + HandleMap m_subscribers; + HandleMap m_entries; + HandleMap m_multiSubscribers; + HandleMap m_dataloggers; + + // name mappings + wpi::StringMap m_nameTopics; + + // listeners + wpi::DenseMap> m_listeners; + + // string-based listeners + VectorSet m_topicPrefixListeners; + + // schema publishers + wpi::StringMap m_schemas; +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/LocalSubscriber.h b/ntcore/src/main/native/cpp/local/LocalSubscriber.h new file mode 100644 index 0000000000..e0c5cf4228 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalSubscriber.h @@ -0,0 +1,53 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include + +#include "Handle.h" +#include "Types_internal.h" +#include "ValueCircularBuffer.h" +#include "VectorSet.h" +#include "local/LocalTopic.h" +#include "local/PubSubConfig.h" +#include "ntcore_c.h" + +namespace nt::local { + +struct LocalSubscriber { + static constexpr auto kType = Handle::kSubscriber; + + LocalSubscriber(NT_Subscriber handle, LocalTopic* topic, PubSubConfig config) + : handle{handle}, + topic{topic}, + config{std::move(config)}, + pollStorage{config.pollStorage} {} + + void UpdateActive() { + // for subscribers, unassigned is a wildcard + // also allow numerically compatible subscribers + active = config.type == NT_UNASSIGNED || + (config.type == topic->type && config.typeStr == topic->typeStr) || + IsNumericCompatible(config.type, topic->type); + } + + // invariants + wpi::SignalObject handle; + LocalTopic* topic; + PubSubConfig config; + + // whether or not the subscriber should actually receive values + bool active{false}; + + // polling storage + ValueCircularBuffer pollStorage; + + // value listeners + VectorSet valueListeners; +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/LocalTopic.cpp b/ntcore/src/main/native/cpp/local/LocalTopic.cpp new file mode 100644 index 0000000000..2803dd5a31 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalTopic.cpp @@ -0,0 +1,226 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "LocalTopic.h" + +#include + +#include "local/LocalDataLogger.h" + +using namespace nt::local; + +void LocalTopic::StartStopDataLog(LocalDataLogger* logger, int64_t timestamp, + bool publish) { + auto it = std::find_if( + datalogs.begin(), datalogs.end(), + [&](const auto& elem) { return elem.logger == logger->handle; }); + if (publish && it == datalogs.end()) { + datalogs.emplace_back( + logger->log, + logger->Start(name, typeStr, + LocalDataLoggerEntry::MakeMetadata(m_propertiesStr), + timestamp), + logger->handle); + datalogType = type; + } else if (!publish && it != datalogs.end()) { + it->Finish(timestamp); + datalogType = NT_UNASSIGNED; + datalogs.erase(it); + } +} + +void LocalTopic::UpdateDataLogProperties() { + if (!datalogs.empty()) { + auto now = Now(); + auto metadata = LocalDataLoggerEntry::MakeMetadata(m_propertiesStr); + for (auto&& datalog : datalogs) { + datalog.SetMetadata(metadata, now); + } + } +} + +wpi::json LocalTopic::SetFlags(unsigned int flags) { + wpi::json update = wpi::json::object(); + if ((flags & NT_PERSISTENT) != 0) { + properties["persistent"] = true; + update["persistent"] = true; + } else { + properties.erase("persistent"); + update["persistent"] = wpi::json(); + } + if ((flags & NT_RETAINED) != 0) { + properties["retained"] = true; + update["retained"] = true; + } else { + properties.erase("retained"); + update["retained"] = wpi::json(); + } + if ((flags & NT_UNCACHED) != 0) { + properties["cached"] = false; + update["cached"] = false; + } else { + properties.erase("cached"); + update["cached"] = wpi::json(); + } + if ((flags & NT_UNCACHED) != 0) { + lastValue = {}; + lastValueNetwork = {}; + lastValueFromNetwork = false; + } + this->m_flags = flags; + return update; +} + +wpi::json LocalTopic::SetPersistent(bool value) { + wpi::json update = wpi::json::object(); + if (value) { + m_flags |= NT_PERSISTENT; + properties["persistent"] = true; + update["persistent"] = true; + } else { + m_flags &= ~NT_PERSISTENT; + properties.erase("persistent"); + update["persistent"] = wpi::json(); + } + return update; +} + +wpi::json LocalTopic::SetRetained(bool value) { + wpi::json update = wpi::json::object(); + if (value) { + m_flags |= NT_RETAINED; + properties["retained"] = true; + update["retained"] = true; + } else { + m_flags &= ~NT_RETAINED; + properties.erase("retained"); + update["retained"] = wpi::json(); + } + return update; +} + +wpi::json LocalTopic::SetCached(bool value) { + wpi::json update = wpi::json::object(); + if (value) { + m_flags &= ~NT_UNCACHED; + properties.erase("cached"); + update["cached"] = wpi::json(); + } else { + m_flags |= NT_UNCACHED; + properties["cached"] = false; + update["cached"] = false; + } + return update; +} + +wpi::json LocalTopic::SetProperty(std::string_view name, + const wpi::json& value) { + if (value.is_null()) { + properties.erase(name); + } else { + properties[name] = value; + } + wpi::json update = wpi::json::object(); + update[name] = value; + return update; +} + +wpi::json LocalTopic::DeleteProperty(std::string_view name) { + properties.erase(name); + wpi::json update = wpi::json::object(); + update[name] = wpi::json(); + return update; +} + +bool LocalTopic::SetProperties(const wpi::json& update) { + if (!update.is_object()) { + return false; + } + for (auto&& change : update.items()) { + if (change.value().is_null()) { + properties.erase(change.key()); + } else { + properties[change.key()] = change.value(); + } + } + return true; +} + +void LocalTopic::RefreshProperties(bool updateFlags) { + if (updateFlags) { + RefreshFlags(); + } + m_propertiesStr = properties.dump(); +} + +wpi::json LocalTopic::CompareProperties(const wpi::json props) { + wpi::json update = wpi::json::object(); + // added/changed + for (auto&& prop : props.items()) { + auto it = properties.find(prop.key()); + if (it == properties.end() || *it != prop.value()) { + update[prop.key()] = prop.value(); + } + } + // removed + for (auto&& prop : properties.items()) { + if (props.find(prop.key()) == props.end()) { + update[prop.key()] = wpi::json(); + } + } + return update; +} + +void LocalTopic::ResetIfDoesNotExist() { + if (Exists()) { + return; + } + lastValue = {}; + lastValueNetwork = {}; + lastValueFromNetwork = false; + type = NT_UNASSIGNED; + typeStr.clear(); + m_flags = 0; + properties = wpi::json::object(); + m_propertiesStr = "{}"; +} + +void LocalTopic::RefreshFlags() { + auto it = properties.find("persistent"); + if (it != properties.end()) { + if (auto val = it->get_ptr()) { + if (*val) { + m_flags |= NT_PERSISTENT; + } else { + m_flags &= ~NT_PERSISTENT; + } + } + } + it = properties.find("retained"); + if (it != properties.end()) { + if (auto val = it->get_ptr()) { + if (*val) { + m_flags |= NT_RETAINED; + } else { + m_flags &= ~NT_RETAINED; + } + } + } + it = properties.find("cached"); + if (it != properties.end()) { + if (auto val = it->get_ptr()) { + if (*val) { + m_flags &= ~NT_UNCACHED; + } else { + m_flags |= NT_UNCACHED; + } + } + } + + if ((m_flags & NT_UNCACHED) != 0) { + lastValue = {}; + lastValueNetwork = {}; + lastValueFromNetwork = false; + } +} diff --git a/ntcore/src/main/native/cpp/local/LocalTopic.h b/ntcore/src/main/native/cpp/local/LocalTopic.h new file mode 100644 index 0000000000..278248ea15 --- /dev/null +++ b/ntcore/src/main/native/cpp/local/LocalTopic.h @@ -0,0 +1,111 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include + +#include +#include +#include + +#include "Handle.h" +#include "VectorSet.h" +#include "local/LocalDataLogger.h" +#include "local/LocalDataLoggerEntry.h" +#include "ntcore_cpp.h" + +namespace nt::local { + +struct LocalDataLogger; +struct LocalEntry; +struct LocalMultiSubscriber; +struct LocalPublisher; +struct LocalSubscriber; + +constexpr bool IsSpecial(std::string_view name) { + return name.empty() ? false : name.front() == '$'; +} + +struct LocalTopic { + static constexpr auto kType = Handle::kTopic; + + LocalTopic(NT_Topic handle, std::string_view name) + : handle{handle}, name{name}, special{IsSpecial(name)} {} + + bool Exists() const { return onNetwork || !localPublishers.empty(); } + + bool IsCached() const { return (m_flags & NT_UNCACHED) == 0; } + + // starts if publish is true, stops if false + void StartStopDataLog(LocalDataLogger* logger, int64_t timestamp, + bool publish); + void UpdateDataLogProperties(); + + unsigned int GetFlags() const { return m_flags; } + + // these return update json + wpi::json SetFlags(unsigned int flags); + wpi::json SetPersistent(bool value); + wpi::json SetRetained(bool value); + wpi::json SetCached(bool value); + wpi::json SetProperty(std::string_view name, const wpi::json& value); + wpi::json DeleteProperty(std::string_view name); + + // returns false if not object + bool SetProperties(const wpi::json& update); + + void RefreshProperties(bool updateFlags); + + // returns update json + wpi::json CompareProperties(const wpi::json props); + + TopicInfo GetTopicInfo() const { + TopicInfo info; + info.topic = handle; + info.name = name; + info.type = type; + info.type_str = typeStr; + info.properties = m_propertiesStr; + return info; + } + + void ResetIfDoesNotExist(); + + // invariants + wpi::SignalObject handle; + std::string name; + bool special; + + Value lastValue; // also stores timestamp + Value lastValueNetwork; + NT_Type type{NT_UNASSIGNED}; + std::string typeStr; + wpi::json properties = wpi::json::object(); + LocalEntry* entry{nullptr}; // cached entry for GetEntry() + + bool onNetwork{false}; // true if there are any remote publishers + bool lastValueFromNetwork{false}; + + wpi::SmallVector datalogs; + NT_Type datalogType{NT_UNASSIGNED}; + + VectorSet localPublishers; + VectorSet localSubscribers; + VectorSet multiSubscribers; + VectorSet entries; + VectorSet listeners; + + private: + // update flags from properties + void RefreshFlags(); + + unsigned int m_flags{0}; // for NT3 APIs + std::string m_propertiesStr{"{}"}; // cached string for GetTopicInfo() et al +}; + +} // namespace nt::local diff --git a/ntcore/src/main/native/cpp/local/PubSubConfig.h b/ntcore/src/main/native/cpp/local/PubSubConfig.h new file mode 100644 index 0000000000..ab5c118f9b --- /dev/null +++ b/ntcore/src/main/native/cpp/local/PubSubConfig.h @@ -0,0 +1,27 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +#include "PubSubOptions.h" +#include "ntcore_c.h" + +namespace nt::local { + +struct PubSubConfig : public PubSubOptionsImpl { + PubSubConfig() = default; + PubSubConfig(NT_Type type, std::string_view typeStr, + const PubSubOptions& options) + : PubSubOptionsImpl{options}, type{type}, typeStr{typeStr} { + prefixMatch = false; + } + + NT_Type type{NT_UNASSIGNED}; + std::string typeStr; +}; + +} // namespace nt::local