[ntcore] Split LocalStorage implementation into separate files

This commit is contained in:
Peter Johnson
2024-10-18 23:08:52 -07:00
parent 0921054a28
commit 4a43ddbacf
17 changed files with 2360 additions and 1907 deletions

View File

@@ -7,6 +7,7 @@ file(
GLOB ntcore_native_src
src/main/native/cpp/*.cpp
src/generated/main/native/cpp/*.cpp
src/main/native/cpp/local/*.cpp
src/main/native/cpp/net/*.cpp
src/main/native/cpp/net3/*.cpp
src/main/native/cpp/networktables/*.cpp

File diff suppressed because it is too large Load Diff

View File

@@ -6,25 +6,18 @@
#include <stdint.h>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include <wpi/DenseMap.h>
#include <wpi/StringMap.h>
#include <wpi/Synchronization.h>
#include <wpi/Logger.h>
#include <wpi/SmallVector.h>
#include <wpi/json.h>
#include <wpi/mutex.h>
#include "Handle.h"
#include "HandleMap.h"
#include "PubSubOptions.h"
#include "Types_internal.h"
#include "ValueCircularBuffer.h"
#include "VectorSet.h"
#include "local/LocalStorageImpl.h"
#include "net/MessageHandler.h"
#include "net/NetworkInterface.h"
#include "ntcore_cpp.h"
@@ -43,19 +36,46 @@ class LocalStorage final : public net::ILocalStorage {
: m_impl{inst, listenerStorage, logger} {}
LocalStorage(const LocalStorage&) = delete;
LocalStorage& operator=(const LocalStorage&) = delete;
~LocalStorage() final;
// network interface functions
int ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
void ServerUnannounce(std::string_view name, int id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void ServerSetValue(int topicId, const Value& value) final;
std::optional<int> pubuid) final {
std::scoped_lock lock{m_mutex};
auto topic = m_impl.GetOrCreateTopic(name);
m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid);
return Handle{topic->handle}.GetIndex();
}
void StartNetwork(net::ClientMessageHandler* network) final;
void ClearNetwork() final;
void ServerUnannounce(std::string_view name, int id) final {
std::scoped_lock lock{m_mutex};
m_impl.RemoveNetworkPublisher(m_impl.GetOrCreateTopic(name));
}
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopicByName(name)) {
m_impl.NetworkPropertiesUpdate(topic, update, ack);
}
}
void ServerSetValue(int topicId, const Value& value) final {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopicById(topicId)) {
m_impl.ServerSetValue(topic, value);
}
}
void StartNetwork(net::ClientMessageHandler* network) final {
std::scoped_lock lock{m_mutex};
m_impl.StartNetwork(network);
}
void ClearNetwork() final {
std::scoped_lock lock{m_mutex};
m_impl.ClearNetwork();
}
// User functions. These are the actual implementations of the corresponding
// user API functions in ntcore_cpp.
@@ -79,7 +99,7 @@ class LocalStorage final : public net::ILocalStorage {
std::string GetTopicName(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return topic->name;
} else {
return {};
@@ -88,7 +108,7 @@ class LocalStorage final : public net::ILocalStorage {
NT_Type GetTopicType(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return topic->type;
} else {
return {};
@@ -97,7 +117,7 @@ class LocalStorage final : public net::ILocalStorage {
std::string GetTopicTypeString(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return topic->typeStr;
} else {
return {};
@@ -106,15 +126,15 @@ class LocalStorage final : public net::ILocalStorage {
void SetTopicPersistent(NT_Topic topicHandle, bool value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
m_impl.SetPersistent(topic, value);
}
}
bool GetTopicPersistent(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return (topic->flags & NT_PERSISTENT) != 0;
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return (topic->GetFlags() & NT_PERSISTENT) != 0;
} else {
return false;
}
@@ -122,15 +142,15 @@ class LocalStorage final : public net::ILocalStorage {
void SetTopicRetained(NT_Topic topicHandle, bool value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
m_impl.SetRetained(topic, value);
}
}
bool GetTopicRetained(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return (topic->flags & NT_RETAINED) != 0;
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return (topic->GetFlags() & NT_RETAINED) != 0;
} else {
return false;
}
@@ -138,15 +158,15 @@ class LocalStorage final : public net::ILocalStorage {
void SetTopicCached(NT_Topic topicHandle, bool value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
m_impl.SetCached(topic, value);
}
}
bool GetTopicCached(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return (topic->flags & NT_UNCACHED) == 0;
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return (topic->GetFlags() & NT_UNCACHED) == 0;
} else {
return false;
}
@@ -154,47 +174,72 @@ class LocalStorage final : public net::ILocalStorage {
bool GetTopicExists(NT_Handle handle) {
std::scoped_lock lock{m_mutex};
TopicData* topic = m_impl.GetTopic(handle);
local::LocalTopic* topic = m_impl.GetTopic(handle);
return topic && topic->Exists();
}
wpi::json GetTopicProperty(NT_Topic topicHandle, std::string_view name) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return topic->properties.value(name, wpi::json{});
} else {
return {};
}
}
void SetTopicProperty(NT_Topic topic, std::string_view name,
const wpi::json& value);
void SetTopicProperty(NT_Topic topicHandle, std::string_view name,
const wpi::json& value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
m_impl.SetProperty(topic, name, value);
}
}
void DeleteTopicProperty(NT_Topic topic, std::string_view name);
void DeleteTopicProperty(NT_Topic topicHandle, std::string_view name) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
m_impl.DeleteProperty(topic, name);
}
}
wpi::json GetTopicProperties(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return topic->properties;
} else {
return wpi::json::object();
}
}
bool SetTopicProperties(NT_Topic topic, const wpi::json& update);
bool SetTopicProperties(NT_Topic topicHandle, const wpi::json& update) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return m_impl.SetProperties(topic, update, true);
} else {
return {};
}
}
TopicInfo GetTopicInfo(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
return topic->GetTopicInfo();
} else {
return {};
}
}
NT_Subscriber Subscribe(NT_Topic topic, NT_Type type,
NT_Subscriber Subscribe(NT_Topic topicHandle, NT_Type type,
std::string_view typeStr,
const PubSubOptions& options);
const PubSubOptions& options) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
if (auto subscriber = m_impl.Subscribe(topic, type, typeStr, options)) {
return subscriber->handle;
}
}
return {};
}
void Unsubscribe(NT_Subscriber subHandle) {
std::scoped_lock lock{m_mutex};
@@ -202,21 +247,52 @@ class LocalStorage final : public net::ILocalStorage {
}
NT_MultiSubscriber SubscribeMultiple(
std::span<const std::string_view> prefixes, const PubSubOptions& options);
std::span<const std::string_view> prefixes,
const PubSubOptions& options) {
std::scoped_lock lock{m_mutex};
if (auto sub = m_impl.AddMultiSubscriber(prefixes, options)) {
return sub->handle;
} else {
return {};
}
}
void UnsubscribeMultiple(NT_MultiSubscriber subHandle) {
std::scoped_lock lock{m_mutex};
m_impl.RemoveMultiSubscriber(subHandle);
}
NT_Publisher Publish(NT_Topic topic, NT_Type type, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptions& options);
NT_Publisher Publish(NT_Topic topicHandle, NT_Type type,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptions& options) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
if (auto publisher =
m_impl.Publish(topic, type, typeStr, properties, options)) {
return publisher->handle;
}
} else {
WPI_ERROR(m_impl.GetLogger(),
"trying to publish invalid topic handle ({})", topicHandle);
}
return {};
}
void Unpublish(NT_Handle pubentry);
void Unpublish(NT_Handle pubentryHandle) {
std::scoped_lock lock{m_mutex};
m_impl.Unpublish(pubentryHandle);
}
NT_Entry GetEntry(NT_Topic topic, NT_Type type, std::string_view typeStr,
const PubSubOptions& options);
NT_Entry GetEntry(NT_Topic topicHandle, NT_Type type,
std::string_view typeStr, const PubSubOptions& options) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopicByHandle(topicHandle)) {
if (auto entry = m_impl.GetEntry(topic, type, typeStr, options)) {
return entry->handle;
}
}
return {};
}
void ReleaseEntry(NT_Entry entryHandle) {
std::scoped_lock lock{m_mutex};
@@ -298,22 +374,29 @@ class LocalStorage final : public net::ILocalStorage {
void SetEntryFlags(NT_Entry entryHandle, unsigned int flags) {
std::scoped_lock lock{m_mutex};
if (auto entry = m_impl.m_entries.Get(entryHandle)) {
if (auto entry = m_impl.GetEntryByHandle(entryHandle)) {
m_impl.SetFlags(entry->subscriber->topic, flags);
}
}
unsigned int GetEntryFlags(NT_Entry entryHandle) {
std::scoped_lock lock{m_mutex};
if (auto entry = m_impl.m_entries.Get(entryHandle)) {
return entry->subscriber->topic->flags;
if (auto entry = m_impl.GetEntryByHandle(entryHandle)) {
return entry->subscriber->topic->GetFlags();
} else {
return 0;
}
}
// Index-only
NT_Entry GetEntry(std::string_view name);
NT_Entry GetEntry(std::string_view name) {
std::scoped_lock lock{m_mutex};
if (auto entry = m_impl.GetEntry(name)) {
return entry->handle;
} else {
return {};
}
}
std::string GetEntryName(NT_Entry subentryHandle) {
std::scoped_lock lock{m_mutex};
@@ -346,346 +429,65 @@ class LocalStorage final : public net::ILocalStorage {
// Listener functions
//
void AddListener(NT_Listener listener,
void AddListener(NT_Listener listenerHandle,
std::span<const std::string_view> prefixes,
unsigned int mask);
unsigned int mask) {
mask &= (NT_EVENT_TOPIC | NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE);
std::scoped_lock lock{m_mutex};
// subscribe to make sure topic updates are received
if (auto sub = m_impl.AddMultiSubscriber(
prefixes, {.topicsOnly = (mask & NT_EVENT_VALUE_ALL) == 0})) {
m_impl.AddListenerImpl(listenerHandle, sub, mask, true);
}
}
void AddListener(NT_Listener listener, NT_Handle handle, unsigned int mask);
void RemoveListener(NT_Listener listener, unsigned int mask);
void RemoveListener(NT_Listener listener, unsigned int mask) {
std::scoped_lock lock{m_mutex};
m_impl.RemoveListener(listener, mask);
}
//
// Data log functions
//
NT_DataLogger StartDataLog(wpi::log::DataLog& log, std::string_view prefix,
std::string_view logPrefix);
void StopDataLog(NT_DataLogger logger);
std::string_view logPrefix) {
std::scoped_lock lock{m_mutex};
if (auto dl = m_impl.StartDataLog(log, prefix, logPrefix)) {
return dl->handle;
} else {
return {};
}
}
void StopDataLog(NT_DataLogger logger) {
std::scoped_lock lock{m_mutex};
m_impl.StopDataLog(logger);
}
//
// Schema functions
//
bool HasSchema(std::string_view name);
void AddSchema(std::string_view name, std::string_view type,
std::span<const uint8_t> schema);
void Reset();
private:
static constexpr bool IsSpecial(std::string_view name) {
return name.empty() ? false : name.front() == '$';
bool HasSchema(std::string_view name) {
std::scoped_lock lock{m_mutex};
return m_impl.HasSchema(name);
}
struct EntryData;
struct PublisherData;
struct SubscriberData;
struct MultiSubscriberData;
struct DataLoggerEntry {
DataLoggerEntry(wpi::log::DataLog& log, int entry, NT_DataLogger logger)
: log{&log}, entry{entry}, logger{logger} {}
static std::string MakeMetadata(std::string_view properties);
void Append(const Value& v);
wpi::log::DataLog* log;
int entry;
NT_DataLogger logger;
};
struct TopicData {
static constexpr auto kType = Handle::kTopic;
TopicData(NT_Topic handle, std::string_view name)
: handle{handle}, name{name}, special{IsSpecial(name)} {}
bool Exists() const { return onNetwork || !localPublishers.empty(); }
bool IsCached() const { return (flags & NT_UNCACHED) == 0; }
TopicInfo GetTopicInfo() const;
// invariants
wpi::SignalObject<NT_Topic> handle;
std::string name;
bool special;
Value lastValue; // also stores timestamp
Value lastValueNetwork;
NT_Type type{NT_UNASSIGNED};
std::string typeStr;
unsigned int flags{0}; // for NT3 APIs
std::string propertiesStr{"{}"}; // cached string for GetTopicInfo() et al
wpi::json properties = wpi::json::object();
NT_Entry entry{0}; // cached entry for GetEntry()
bool onNetwork{false}; // true if there are any remote publishers
bool lastValueFromNetwork{false};
wpi::SmallVector<DataLoggerEntry, 1> datalogs;
NT_Type datalogType{NT_UNASSIGNED};
VectorSet<PublisherData*> localPublishers;
VectorSet<SubscriberData*> localSubscribers;
VectorSet<MultiSubscriberData*> multiSubscribers;
VectorSet<EntryData*> entries;
VectorSet<NT_Listener> listeners;
};
struct PubSubConfig : public PubSubOptionsImpl {
PubSubConfig() = default;
PubSubConfig(NT_Type type, std::string_view typeStr,
const PubSubOptions& options)
: PubSubOptionsImpl{options}, type{type}, typeStr{typeStr} {
prefixMatch = false;
}
NT_Type type{NT_UNASSIGNED};
std::string typeStr;
};
struct PublisherData {
static constexpr auto kType = Handle::kPublisher;
PublisherData(NT_Publisher handle, TopicData* topic, PubSubConfig config)
: handle{handle}, topic{topic}, config{std::move(config)} {}
void UpdateActive() {
active = config.type == topic->type && config.typeStr == topic->typeStr;
}
// invariants
wpi::SignalObject<NT_Publisher> handle;
TopicData* topic;
PubSubConfig config;
// whether or not the publisher should actually publish values
bool active{false};
};
struct SubscriberData {
static constexpr auto kType = Handle::kSubscriber;
SubscriberData(NT_Subscriber handle, TopicData* topic, PubSubConfig config)
: handle{handle},
topic{topic},
config{std::move(config)},
pollStorage{config.pollStorage} {}
void UpdateActive() {
// for subscribers, unassigned is a wildcard
// also allow numerically compatible subscribers
active =
config.type == NT_UNASSIGNED ||
(config.type == topic->type && config.typeStr == topic->typeStr) ||
IsNumericCompatible(config.type, topic->type);
}
// invariants
wpi::SignalObject<NT_Subscriber> handle;
TopicData* topic;
PubSubConfig config;
// whether or not the subscriber should actually receive values
bool active{false};
// polling storage
ValueCircularBuffer pollStorage;
// value listeners
VectorSet<NT_Listener> valueListeners;
};
struct EntryData {
static constexpr auto kType = Handle::kEntry;
EntryData(NT_Entry handle, SubscriberData* subscriber)
: handle{handle}, topic{subscriber->topic}, subscriber{subscriber} {}
// invariants
wpi::SignalObject<NT_Entry> handle;
TopicData* topic;
SubscriberData* subscriber;
// the publisher (created on demand)
PublisherData* publisher{nullptr};
};
struct MultiSubscriberData {
static constexpr auto kType = Handle::kMultiSubscriber;
MultiSubscriberData(NT_MultiSubscriber handle,
std::span<const std::string_view> prefixes,
const PubSubOptionsImpl& options)
: handle{handle}, options{options} {
this->options.prefixMatch = true;
this->prefixes.reserve(prefixes.size());
for (auto&& prefix : prefixes) {
this->prefixes.emplace_back(prefix);
}
}
bool Matches(std::string_view name, bool special);
// invariants
wpi::SignalObject<NT_MultiSubscriber> handle;
std::vector<std::string> prefixes;
PubSubOptionsImpl options;
// value listeners
VectorSet<NT_Listener> valueListeners;
};
struct ListenerData {
ListenerData(NT_Listener handle, SubscriberData* subscriber,
unsigned int eventMask, bool subscriberOwned)
: handle{handle},
eventMask{eventMask},
subscriber{subscriber},
subscriberOwned{subscriberOwned} {}
ListenerData(NT_Listener handle, MultiSubscriberData* subscriber,
unsigned int eventMask, bool subscriberOwned)
: handle{handle},
eventMask{eventMask},
multiSubscriber{subscriber},
subscriberOwned{subscriberOwned} {}
NT_Listener handle;
unsigned int eventMask;
SubscriberData* subscriber{nullptr};
MultiSubscriberData* multiSubscriber{nullptr};
bool subscriberOwned;
};
struct DataLoggerData {
static constexpr auto kType = Handle::kDataLogger;
DataLoggerData(NT_DataLogger handle, wpi::log::DataLog& log,
std::string_view prefix, std::string_view logPrefix)
: handle{handle}, log{log}, prefix{prefix}, logPrefix{logPrefix} {}
int Start(TopicData* topic, int64_t time);
NT_DataLogger handle;
wpi::log::DataLog& log;
std::string prefix;
std::string logPrefix;
};
// inner struct to protect against accidentally deadlocking on the mutex
struct Impl {
Impl(int inst, IListenerStorage& listenerStorage, wpi::Logger& logger);
int m_inst;
IListenerStorage& m_listenerStorage;
wpi::Logger& m_logger;
net::ClientMessageHandler* m_network{nullptr};
// handle mappings
HandleMap<TopicData, 16> m_topics;
HandleMap<PublisherData, 16> m_publishers;
HandleMap<SubscriberData, 16> m_subscribers;
HandleMap<EntryData, 16> m_entries;
HandleMap<MultiSubscriberData, 16> m_multiSubscribers;
HandleMap<DataLoggerData, 16> m_dataloggers;
// name mappings
wpi::StringMap<TopicData*> m_nameTopics;
// listeners
wpi::DenseMap<NT_Listener, std::unique_ptr<ListenerData>> m_listeners;
// string-based listeners
VectorSet<ListenerData*> m_topicPrefixListeners;
// schema publishers
wpi::StringMap<NT_Publisher> m_schemas;
// topic functions
void NotifyTopic(TopicData* topic, unsigned int eventFlags);
void CheckReset(TopicData* topic);
bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags,
bool suppressIfDuplicate, const PublisherData* publisher);
void NotifyValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
const PublisherData* publisher);
void SetFlags(TopicData* topic, unsigned int flags);
void SetPersistent(TopicData* topic, bool value);
void SetRetained(TopicData* topic, bool value);
void SetCached(TopicData* topic, bool value);
void SetProperties(TopicData* topic, const wpi::json& update,
bool sendNetwork);
void PropertiesUpdated(TopicData* topic, const wpi::json& update,
unsigned int eventFlags, bool sendNetwork,
bool updateFlags = true);
void RefreshPubSubActive(TopicData* topic, bool warnOnSubMismatch);
void NetworkAnnounce(TopicData* topic, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid);
void RemoveNetworkPublisher(TopicData* topic);
void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack);
void StartNetwork(net::ClientMessageHandler* network);
PublisherData* AddLocalPublisher(TopicData* topic,
const wpi::json& properties,
const PubSubConfig& options);
std::unique_ptr<PublisherData> RemoveLocalPublisher(NT_Publisher pubHandle);
SubscriberData* AddLocalSubscriber(TopicData* topic,
const PubSubConfig& options);
std::unique_ptr<SubscriberData> RemoveLocalSubscriber(
NT_Subscriber subHandle);
EntryData* AddEntry(SubscriberData* subscriber);
std::unique_ptr<EntryData> RemoveEntry(NT_Entry entryHandle);
MultiSubscriberData* AddMultiSubscriber(
std::span<const std::string_view> prefixes,
const PubSubOptions& options);
std::unique_ptr<MultiSubscriberData> RemoveMultiSubscriber(
NT_MultiSubscriber subHandle);
void AddListenerImpl(NT_Listener listenerHandle, TopicData* topic,
unsigned int eventMask);
void AddListenerImpl(NT_Listener listenerHandle, SubscriberData* subscriber,
unsigned int eventMask, NT_Handle subentryHandle,
bool subscriberOwned);
void AddListenerImpl(NT_Listener listenerHandle,
MultiSubscriberData* subscriber,
unsigned int eventMask, bool subscriberOwned);
void AddListenerImpl(NT_Listener listenerHandle,
std::span<const std::string_view> prefixes,
unsigned int eventMask);
TopicData* GetOrCreateTopic(std::string_view name);
TopicData* GetTopic(NT_Handle handle);
SubscriberData* GetSubEntry(NT_Handle subentryHandle);
PublisherData* PublishEntry(EntryData* entry, NT_Type type);
Value* GetSubEntryValue(NT_Handle subentryHandle) {
if (auto subscriber = GetSubEntry(subentryHandle)) {
return &subscriber->topic->lastValue;
} else {
return nullptr;
}
}
bool PublishLocalValue(PublisherData* publisher, const Value& value,
bool force = false);
bool SetEntryValue(NT_Handle pubentryHandle, const Value& value);
bool SetDefaultEntryValue(NT_Handle pubsubentryHandle, const Value& value);
void RemoveSubEntry(NT_Handle subentryHandle);
};
void AddSchema(std::string_view name, std::string_view type,
std::span<const uint8_t> schema) {
std::scoped_lock lock{m_mutex};
m_impl.AddSchema(name, type, schema);
}
void Reset() {
std::scoped_lock lock{m_mutex};
m_impl.Reset();
}
private:
wpi::mutex m_mutex;
Impl m_impl;
local::StorageImpl m_impl;
};
} // namespace nt

View File

@@ -0,0 +1,24 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "LocalDataLogger.h"
#include <fmt/format.h>
#include <wpi/DataLog.h>
#include <wpi/StringExtras.h>
using namespace nt::local;
int LocalDataLogger::Start(std::string_view name, std::string_view typeStr,
std::string_view metadata, int64_t time) {
// NT and DataLog use different standard representations for int and int[]
if (typeStr == "int") {
typeStr = "int64";
} else if (typeStr == "int[]") {
typeStr = "int64[]";
}
return log.Start(fmt::format("{}{}", logPrefix,
wpi::remove_prefix(name, prefix).value_or(name)),
typeStr, metadata, time);
}

View File

@@ -0,0 +1,39 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <string>
#include <string_view>
#include "Handle.h"
#include "ntcore_c.h"
namespace wpi::log {
class DataLog;
} // namespace wpi::log
namespace nt::local {
struct LocalTopic;
struct LocalDataLogger {
static constexpr auto kType = Handle::kDataLogger;
LocalDataLogger(NT_DataLogger handle, wpi::log::DataLog& log,
std::string_view prefix, std::string_view logPrefix)
: handle{handle}, log{log}, prefix{prefix}, logPrefix{logPrefix} {}
int Start(std::string_view name, std::string_view typeStr,
std::string_view metadata, int64_t time);
NT_DataLogger handle;
wpi::log::DataLog& log;
std::string prefix;
std::string logPrefix;
};
} // namespace nt::local

View File

@@ -0,0 +1,64 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "LocalDataLoggerEntry.h"
#include <string>
#include <string_view>
#include <fmt/format.h>
#include <wpi/StringExtras.h>
#include "networktables/NetworkTableValue.h"
using namespace nt::local;
std::string LocalDataLoggerEntry::MakeMetadata(std::string_view properties) {
return fmt::format("{{\"properties\":{},\"source\":\"NT\"}}", properties);
}
void LocalDataLoggerEntry::Append(const Value& v) {
auto time = v.time();
switch (v.type()) {
case NT_BOOLEAN:
log->AppendBoolean(entry, v.GetBoolean(), time);
break;
case NT_INTEGER:
log->AppendInteger(entry, v.GetInteger(), time);
break;
case NT_FLOAT:
log->AppendFloat(entry, v.GetFloat(), time);
break;
case NT_DOUBLE:
log->AppendDouble(entry, v.GetDouble(), time);
break;
case NT_STRING:
log->AppendString(entry, v.GetString(), time);
break;
case NT_RAW: {
auto val = v.GetRaw();
log->AppendRaw(entry,
{reinterpret_cast<const uint8_t*>(val.data()), val.size()},
time);
break;
}
case NT_BOOLEAN_ARRAY:
log->AppendBooleanArray(entry, v.GetBooleanArray(), time);
break;
case NT_INTEGER_ARRAY:
log->AppendIntegerArray(entry, v.GetIntegerArray(), time);
break;
case NT_FLOAT_ARRAY:
log->AppendFloatArray(entry, v.GetFloatArray(), time);
break;
case NT_DOUBLE_ARRAY:
log->AppendDoubleArray(entry, v.GetDoubleArray(), time);
break;
case NT_STRING_ARRAY:
log->AppendStringArray(entry, v.GetStringArray(), time);
break;
default:
break;
}
}

View File

@@ -0,0 +1,45 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <string>
#include <string_view>
#include <wpi/DataLog.h>
#include "ntcore_c.h"
namespace wpi::log {
class DataLog;
} // namespace wpi::log
namespace nt {
class Value;
} // namespace nt
namespace nt::local {
struct LocalTopic;
struct LocalDataLoggerEntry {
LocalDataLoggerEntry(wpi::log::DataLog& log, int entry, NT_DataLogger logger)
: log{&log}, entry{entry}, logger{logger} {}
static std::string MakeMetadata(std::string_view properties);
void Append(const Value& v);
void Finish(int64_t timestamp) { log->Finish(entry, timestamp); }
void SetMetadata(std::string_view metadata, int64_t timestamp) {
log->SetMetadata(entry, metadata, timestamp);
}
wpi::log::DataLog* log;
int entry;
NT_DataLogger logger;
};
} // namespace nt::local

View File

@@ -0,0 +1,31 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <wpi/Synchronization.h>
#include "Handle.h"
#include "local/LocalSubscriber.h"
namespace nt::local {
struct LocalPublisher;
struct LocalEntry {
static constexpr auto kType = Handle::kEntry;
LocalEntry(NT_Entry handle, LocalSubscriber* subscriber)
: handle{handle}, topic{subscriber->topic}, subscriber{subscriber} {}
// invariants
wpi::SignalObject<NT_Entry> handle;
LocalTopic* topic;
LocalSubscriber* subscriber;
// the publisher (created on demand)
LocalPublisher* publisher{nullptr};
};
} // namespace nt::local

View File

@@ -0,0 +1,35 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include "ntcore_c.h"
namespace nt::local {
struct LocalMultiSubscriber;
struct LocalSubscriber;
struct LocalListener {
LocalListener(NT_Listener handle, LocalSubscriber* subscriber,
unsigned int eventMask, bool subscriberOwned)
: handle{handle},
eventMask{eventMask},
subscriber{subscriber},
subscriberOwned{subscriberOwned} {}
LocalListener(NT_Listener handle, LocalMultiSubscriber* subscriber,
unsigned int eventMask, bool subscriberOwned)
: handle{handle},
eventMask{eventMask},
multiSubscriber{subscriber},
subscriberOwned{subscriberOwned} {}
NT_Listener handle;
unsigned int eventMask;
LocalSubscriber* subscriber{nullptr};
LocalMultiSubscriber* multiSubscriber{nullptr};
bool subscriberOwned;
};
} // namespace nt::local

View File

@@ -0,0 +1,59 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <span>
#include <string>
#include <string_view>
#include <vector>
#include <wpi/StringExtras.h>
#include <wpi/Synchronization.h>
#include "Handle.h"
#include "PubSubOptions.h"
#include "VectorSet.h"
#include "ntcore_c.h"
namespace nt::local {
constexpr bool PrefixMatch(std::string_view name, std::string_view prefix,
bool special) {
return (!special || !prefix.empty()) && wpi::starts_with(name, prefix);
}
struct LocalMultiSubscriber {
static constexpr auto kType = Handle::kMultiSubscriber;
LocalMultiSubscriber(NT_MultiSubscriber handle,
std::span<const std::string_view> prefixes,
const PubSubOptionsImpl& options)
: handle{handle}, options{options} {
this->options.prefixMatch = true;
this->prefixes.reserve(prefixes.size());
for (auto&& prefix : prefixes) {
this->prefixes.emplace_back(prefix);
}
}
bool Matches(std::string_view name, bool special) {
for (auto&& prefix : prefixes) {
if (PrefixMatch(name, prefix, special)) {
return true;
}
}
return false;
}
// invariants
wpi::SignalObject<NT_MultiSubscriber> handle;
std::vector<std::string> prefixes;
PubSubOptionsImpl options;
// value listeners
VectorSet<NT_Listener> valueListeners;
};
} // namespace nt::local

View File

@@ -0,0 +1,36 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <utility>
#include <wpi/Synchronization.h>
#include "Handle.h"
#include "local/LocalTopic.h"
#include "local/PubSubConfig.h"
namespace nt::local {
struct LocalPublisher {
static constexpr auto kType = Handle::kPublisher;
LocalPublisher(NT_Publisher handle, LocalTopic* topic, PubSubConfig config)
: handle{handle}, topic{topic}, config{std::move(config)} {}
void UpdateActive() {
active = config.type == topic->type && config.typeStr == topic->typeStr;
}
// invariants
wpi::SignalObject<NT_Publisher> handle;
LocalTopic* topic;
PubSubConfig config;
// whether or not the publisher should actually publish values
bool active{false};
};
} // namespace nt::local

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,320 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <concepts>
#include <memory>
#include <string_view>
#include <wpi/DenseMap.h>
#include <wpi/StringExtras.h>
#include <wpi/StringMap.h>
#include <wpi/Synchronization.h>
#include <wpi/json.h>
#include "HandleMap.h"
#include "local/LocalDataLogger.h"
#include "local/LocalEntry.h"
#include "local/LocalListener.h"
#include "local/LocalMultiSubscriber.h"
#include "local/LocalPublisher.h"
#include "local/LocalSubscriber.h"
#include "local/LocalTopic.h"
#include "ntcore_c.h"
#include "ntcore_cpp.h"
namespace wpi {
class Logger;
} // namespace wpi
namespace nt {
class IListenerStorage;
} // namespace nt
namespace nt::net {
class ClientMessageHandler;
} // namespace nt::net
namespace nt::local {
// inner struct to protect against accidentally deadlocking on the mutex
class StorageImpl {
public:
StorageImpl(int inst, IListenerStorage& listenerStorage, wpi::Logger& logger);
wpi::Logger& GetLogger() { return m_logger; }
//
// Network interface functions
//
void NetworkAnnounce(LocalTopic* topic, std::string_view typeStr,
const wpi::json& properties, std::optional<int> pubuid);
void RemoveNetworkPublisher(LocalTopic* topic);
void NetworkPropertiesUpdate(LocalTopic* topic, const wpi::json& update,
bool ack);
void ServerSetValue(LocalTopic* topic, const Value& value) {
if (SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) {
if (topic->IsCached()) {
topic->lastValueNetwork = value;
topic->lastValueFromNetwork = true;
}
}
}
void StartNetwork(net::ClientMessageHandler* network);
void ClearNetwork();
//
// Topic functions
//
LocalTopic* GetOrCreateTopic(std::string_view name);
template <std::invocable<const LocalTopic&> F>
void ForEachTopic(std::string_view prefix, unsigned int types,
F&& func) const {
for (auto&& topic : m_topics) {
if (!topic->Exists()) {
continue;
}
if (!wpi::starts_with(topic->name, prefix)) {
continue;
}
if (types != 0 && (types & topic->type) == 0) {
continue;
}
func(*topic);
}
}
template <std::invocable<const LocalTopic&> F>
void ForEachTopic(std::string_view prefix,
std::span<const std::string_view> types, F&& func) const {
for (auto&& topic : m_topics) {
if (!topic->Exists()) {
continue;
}
if (!wpi::starts_with(topic->name, prefix)) {
continue;
}
if (!types.empty()) {
bool match = false;
for (auto&& type : types) {
if (topic->typeStr == type) {
match = true;
break;
}
}
if (!match) {
continue;
}
}
func(*topic);
}
}
//
// Topic property functions
//
void SetFlags(LocalTopic* topic, unsigned int flags);
void SetPersistent(LocalTopic* topic, bool value);
void SetRetained(LocalTopic* topic, bool value);
void SetCached(LocalTopic* topic, bool value);
void SetProperty(LocalTopic* topic, std::string_view name,
const wpi::json& value);
bool SetProperties(LocalTopic* topic, const wpi::json& update,
bool sendNetwork);
void DeleteProperty(LocalTopic* topic, std::string_view name);
//
// Value functions
//
bool SetEntryValue(NT_Handle pubentryHandle, const Value& value);
bool SetDefaultEntryValue(NT_Handle pubsubentryHandle, const Value& value);
Value* GetSubEntryValue(NT_Handle subentryHandle) {
if (auto subscriber = GetSubEntry(subentryHandle)) {
return &subscriber->topic->lastValue;
} else {
return nullptr;
}
}
//
// Publish/Subscribe/Entry functions
//
LocalSubscriber* Subscribe(LocalTopic* topic, NT_Type type,
std::string_view typeString,
const PubSubOptions& options);
LocalPublisher* Publish(LocalTopic* topic, NT_Type type,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptions& options);
LocalEntry* GetEntry(LocalTopic* topicHandle, NT_Type type,
std::string_view typeStr, const PubSubOptions& options);
LocalEntry* GetEntry(std::string_view name);
void RemoveSubEntry(NT_Handle subentryHandle);
void Unpublish(NT_Handle pubentryHandle);
//
// Multi-subscriber functions
//
LocalMultiSubscriber* AddMultiSubscriber(
std::span<const std::string_view> prefixes, const PubSubOptions& options);
std::unique_ptr<LocalMultiSubscriber> RemoveMultiSubscriber(
NT_MultiSubscriber subHandle);
//
// Lookup functions
//
LocalTopic* GetTopic(NT_Handle handle);
LocalTopic* GetTopicByHandle(NT_Topic topicHandle) {
return m_topics.Get(topicHandle);
}
LocalTopic* GetTopicByName(std::string_view name) {
auto it = m_nameTopics.find(name);
if (it == m_nameTopics.end()) {
return nullptr;
}
return it->second;
}
LocalTopic* GetTopicById(int topicId) {
return m_topics.Get(Handle{m_inst, topicId, Handle::kTopic});
}
LocalSubscriber* GetSubEntry(NT_Handle subentryHandle);
LocalEntry* GetEntryByHandle(NT_Entry entryHandle) {
return m_entries.Get(entryHandle);
}
LocalMultiSubscriber* GetMultiSubscriberByHandle(NT_MultiSubscriber handle) {
return m_multiSubscribers.Get(handle);
}
LocalSubscriber* GetSubscriberByHandle(NT_Subscriber handle) {
return m_subscribers.Get(handle);
}
//
// Listener functions
//
void AddListenerImpl(NT_Listener listenerHandle, LocalTopic* topic,
unsigned int eventMask);
void AddListenerImpl(NT_Listener listenerHandle, LocalSubscriber* subscriber,
unsigned int eventMask, NT_Handle subentryHandle,
bool subscriberOwned);
void AddListenerImpl(NT_Listener listenerHandle,
LocalMultiSubscriber* subscriber, unsigned int eventMask,
bool subscriberOwned);
void RemoveListener(NT_Listener listener, unsigned int mask);
//
// Data log functions
//
LocalDataLogger* StartDataLog(wpi::log::DataLog& log, std::string_view prefix,
std::string_view logPrefix);
void StopDataLog(NT_DataLogger logger);
//
// Schema functions
//
bool HasSchema(std::string_view name);
void AddSchema(std::string_view name, std::string_view type,
std::span<const uint8_t> schema);
void Reset();
private:
// topic functions
void NotifyTopic(LocalTopic* topic, unsigned int eventFlags);
bool SetValue(LocalTopic* topic, const Value& value, unsigned int eventFlags,
bool suppressIfDuplicate, const LocalPublisher* publisher);
void NotifyValue(LocalTopic* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
const LocalPublisher* publisher);
void PropertiesUpdated(LocalTopic* topic, const wpi::json& update,
unsigned int eventFlags, bool sendNetwork,
bool updateFlags = true);
void RefreshPubSubActive(LocalTopic* topic, bool warnOnSubMismatch);
LocalPublisher* AddLocalPublisher(LocalTopic* topic,
const wpi::json& properties,
const PubSubConfig& options);
std::unique_ptr<LocalPublisher> RemoveLocalPublisher(NT_Publisher pubHandle);
LocalSubscriber* AddLocalSubscriber(LocalTopic* topic,
const PubSubConfig& options);
std::unique_ptr<LocalSubscriber> RemoveLocalSubscriber(
NT_Subscriber subHandle);
LocalEntry* AddEntry(LocalSubscriber* subscriber) {
auto entry = m_entries.Add(m_inst, subscriber);
subscriber->topic->entries.Add(entry);
return entry;
}
std::unique_ptr<LocalEntry> RemoveEntry(NT_Entry entryHandle) {
auto entry = m_entries.Remove(entryHandle);
if (entry) {
entry->topic->entries.Remove(entry.get());
}
return entry;
}
LocalPublisher* PublishEntry(LocalEntry* entry, NT_Type type);
bool PublishLocalValue(LocalPublisher* publisher, const Value& value,
bool force = false);
private:
int m_inst;
IListenerStorage& m_listenerStorage;
wpi::Logger& m_logger;
net::ClientMessageHandler* m_network{nullptr};
// handle mappings
HandleMap<LocalTopic, 16> m_topics;
HandleMap<LocalPublisher, 16> m_publishers;
HandleMap<LocalSubscriber, 16> m_subscribers;
HandleMap<LocalEntry, 16> m_entries;
HandleMap<LocalMultiSubscriber, 16> m_multiSubscribers;
HandleMap<LocalDataLogger, 16> m_dataloggers;
// name mappings
wpi::StringMap<LocalTopic*> m_nameTopics;
// listeners
wpi::DenseMap<NT_Listener, std::unique_ptr<LocalListener>> m_listeners;
// string-based listeners
VectorSet<LocalListener*> m_topicPrefixListeners;
// schema publishers
wpi::StringMap<NT_Publisher> m_schemas;
};
} // namespace nt::local

View File

@@ -0,0 +1,53 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <utility>
#include <wpi/Synchronization.h>
#include "Handle.h"
#include "Types_internal.h"
#include "ValueCircularBuffer.h"
#include "VectorSet.h"
#include "local/LocalTopic.h"
#include "local/PubSubConfig.h"
#include "ntcore_c.h"
namespace nt::local {
struct LocalSubscriber {
static constexpr auto kType = Handle::kSubscriber;
LocalSubscriber(NT_Subscriber handle, LocalTopic* topic, PubSubConfig config)
: handle{handle},
topic{topic},
config{std::move(config)},
pollStorage{config.pollStorage} {}
void UpdateActive() {
// for subscribers, unassigned is a wildcard
// also allow numerically compatible subscribers
active = config.type == NT_UNASSIGNED ||
(config.type == topic->type && config.typeStr == topic->typeStr) ||
IsNumericCompatible(config.type, topic->type);
}
// invariants
wpi::SignalObject<NT_Subscriber> handle;
LocalTopic* topic;
PubSubConfig config;
// whether or not the subscriber should actually receive values
bool active{false};
// polling storage
ValueCircularBuffer pollStorage;
// value listeners
VectorSet<NT_Listener> valueListeners;
};
} // namespace nt::local

View File

@@ -0,0 +1,226 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "LocalTopic.h"
#include <algorithm>
#include "local/LocalDataLogger.h"
using namespace nt::local;
void LocalTopic::StartStopDataLog(LocalDataLogger* logger, int64_t timestamp,
bool publish) {
auto it = std::find_if(
datalogs.begin(), datalogs.end(),
[&](const auto& elem) { return elem.logger == logger->handle; });
if (publish && it == datalogs.end()) {
datalogs.emplace_back(
logger->log,
logger->Start(name, typeStr,
LocalDataLoggerEntry::MakeMetadata(m_propertiesStr),
timestamp),
logger->handle);
datalogType = type;
} else if (!publish && it != datalogs.end()) {
it->Finish(timestamp);
datalogType = NT_UNASSIGNED;
datalogs.erase(it);
}
}
void LocalTopic::UpdateDataLogProperties() {
if (!datalogs.empty()) {
auto now = Now();
auto metadata = LocalDataLoggerEntry::MakeMetadata(m_propertiesStr);
for (auto&& datalog : datalogs) {
datalog.SetMetadata(metadata, now);
}
}
}
wpi::json LocalTopic::SetFlags(unsigned int flags) {
wpi::json update = wpi::json::object();
if ((flags & NT_PERSISTENT) != 0) {
properties["persistent"] = true;
update["persistent"] = true;
} else {
properties.erase("persistent");
update["persistent"] = wpi::json();
}
if ((flags & NT_RETAINED) != 0) {
properties["retained"] = true;
update["retained"] = true;
} else {
properties.erase("retained");
update["retained"] = wpi::json();
}
if ((flags & NT_UNCACHED) != 0) {
properties["cached"] = false;
update["cached"] = false;
} else {
properties.erase("cached");
update["cached"] = wpi::json();
}
if ((flags & NT_UNCACHED) != 0) {
lastValue = {};
lastValueNetwork = {};
lastValueFromNetwork = false;
}
this->m_flags = flags;
return update;
}
wpi::json LocalTopic::SetPersistent(bool value) {
wpi::json update = wpi::json::object();
if (value) {
m_flags |= NT_PERSISTENT;
properties["persistent"] = true;
update["persistent"] = true;
} else {
m_flags &= ~NT_PERSISTENT;
properties.erase("persistent");
update["persistent"] = wpi::json();
}
return update;
}
wpi::json LocalTopic::SetRetained(bool value) {
wpi::json update = wpi::json::object();
if (value) {
m_flags |= NT_RETAINED;
properties["retained"] = true;
update["retained"] = true;
} else {
m_flags &= ~NT_RETAINED;
properties.erase("retained");
update["retained"] = wpi::json();
}
return update;
}
wpi::json LocalTopic::SetCached(bool value) {
wpi::json update = wpi::json::object();
if (value) {
m_flags &= ~NT_UNCACHED;
properties.erase("cached");
update["cached"] = wpi::json();
} else {
m_flags |= NT_UNCACHED;
properties["cached"] = false;
update["cached"] = false;
}
return update;
}
wpi::json LocalTopic::SetProperty(std::string_view name,
const wpi::json& value) {
if (value.is_null()) {
properties.erase(name);
} else {
properties[name] = value;
}
wpi::json update = wpi::json::object();
update[name] = value;
return update;
}
wpi::json LocalTopic::DeleteProperty(std::string_view name) {
properties.erase(name);
wpi::json update = wpi::json::object();
update[name] = wpi::json();
return update;
}
bool LocalTopic::SetProperties(const wpi::json& update) {
if (!update.is_object()) {
return false;
}
for (auto&& change : update.items()) {
if (change.value().is_null()) {
properties.erase(change.key());
} else {
properties[change.key()] = change.value();
}
}
return true;
}
void LocalTopic::RefreshProperties(bool updateFlags) {
if (updateFlags) {
RefreshFlags();
}
m_propertiesStr = properties.dump();
}
wpi::json LocalTopic::CompareProperties(const wpi::json props) {
wpi::json update = wpi::json::object();
// added/changed
for (auto&& prop : props.items()) {
auto it = properties.find(prop.key());
if (it == properties.end() || *it != prop.value()) {
update[prop.key()] = prop.value();
}
}
// removed
for (auto&& prop : properties.items()) {
if (props.find(prop.key()) == props.end()) {
update[prop.key()] = wpi::json();
}
}
return update;
}
void LocalTopic::ResetIfDoesNotExist() {
if (Exists()) {
return;
}
lastValue = {};
lastValueNetwork = {};
lastValueFromNetwork = false;
type = NT_UNASSIGNED;
typeStr.clear();
m_flags = 0;
properties = wpi::json::object();
m_propertiesStr = "{}";
}
void LocalTopic::RefreshFlags() {
auto it = properties.find("persistent");
if (it != properties.end()) {
if (auto val = it->get_ptr<bool*>()) {
if (*val) {
m_flags |= NT_PERSISTENT;
} else {
m_flags &= ~NT_PERSISTENT;
}
}
}
it = properties.find("retained");
if (it != properties.end()) {
if (auto val = it->get_ptr<bool*>()) {
if (*val) {
m_flags |= NT_RETAINED;
} else {
m_flags &= ~NT_RETAINED;
}
}
}
it = properties.find("cached");
if (it != properties.end()) {
if (auto val = it->get_ptr<bool*>()) {
if (*val) {
m_flags &= ~NT_UNCACHED;
} else {
m_flags |= NT_UNCACHED;
}
}
}
if ((m_flags & NT_UNCACHED) != 0) {
lastValue = {};
lastValueNetwork = {};
lastValueFromNetwork = false;
}
}

View File

@@ -0,0 +1,111 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <string>
#include <string_view>
#include <wpi/SmallVector.h>
#include <wpi/Synchronization.h>
#include <wpi/json.h>
#include "Handle.h"
#include "VectorSet.h"
#include "local/LocalDataLogger.h"
#include "local/LocalDataLoggerEntry.h"
#include "ntcore_cpp.h"
namespace nt::local {
struct LocalDataLogger;
struct LocalEntry;
struct LocalMultiSubscriber;
struct LocalPublisher;
struct LocalSubscriber;
constexpr bool IsSpecial(std::string_view name) {
return name.empty() ? false : name.front() == '$';
}
struct LocalTopic {
static constexpr auto kType = Handle::kTopic;
LocalTopic(NT_Topic handle, std::string_view name)
: handle{handle}, name{name}, special{IsSpecial(name)} {}
bool Exists() const { return onNetwork || !localPublishers.empty(); }
bool IsCached() const { return (m_flags & NT_UNCACHED) == 0; }
// starts if publish is true, stops if false
void StartStopDataLog(LocalDataLogger* logger, int64_t timestamp,
bool publish);
void UpdateDataLogProperties();
unsigned int GetFlags() const { return m_flags; }
// these return update json
wpi::json SetFlags(unsigned int flags);
wpi::json SetPersistent(bool value);
wpi::json SetRetained(bool value);
wpi::json SetCached(bool value);
wpi::json SetProperty(std::string_view name, const wpi::json& value);
wpi::json DeleteProperty(std::string_view name);
// returns false if not object
bool SetProperties(const wpi::json& update);
void RefreshProperties(bool updateFlags);
// returns update json
wpi::json CompareProperties(const wpi::json props);
TopicInfo GetTopicInfo() const {
TopicInfo info;
info.topic = handle;
info.name = name;
info.type = type;
info.type_str = typeStr;
info.properties = m_propertiesStr;
return info;
}
void ResetIfDoesNotExist();
// invariants
wpi::SignalObject<NT_Topic> handle;
std::string name;
bool special;
Value lastValue; // also stores timestamp
Value lastValueNetwork;
NT_Type type{NT_UNASSIGNED};
std::string typeStr;
wpi::json properties = wpi::json::object();
LocalEntry* entry{nullptr}; // cached entry for GetEntry()
bool onNetwork{false}; // true if there are any remote publishers
bool lastValueFromNetwork{false};
wpi::SmallVector<LocalDataLoggerEntry, 1> datalogs;
NT_Type datalogType{NT_UNASSIGNED};
VectorSet<LocalPublisher*> localPublishers;
VectorSet<LocalSubscriber*> localSubscribers;
VectorSet<LocalMultiSubscriber*> multiSubscribers;
VectorSet<LocalEntry*> entries;
VectorSet<NT_Listener> listeners;
private:
// update flags from properties
void RefreshFlags();
unsigned int m_flags{0}; // for NT3 APIs
std::string m_propertiesStr{"{}"}; // cached string for GetTopicInfo() et al
};
} // namespace nt::local

View File

@@ -0,0 +1,27 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <string>
#include <string_view>
#include "PubSubOptions.h"
#include "ntcore_c.h"
namespace nt::local {
struct PubSubConfig : public PubSubOptionsImpl {
PubSubConfig() = default;
PubSubConfig(NT_Type type, std::string_view typeStr,
const PubSubOptions& options)
: PubSubOptionsImpl{options}, type{type}, typeStr{typeStr} {
prefixMatch = false;
}
NT_Type type{NT_UNASSIGNED};
std::string typeStr;
};
} // namespace nt::local