[ntcore] Remove pImpl from implementation (#5480)

Also change Timestamped into a template.
This commit is contained in:
Peter Johnson
2023-08-03 23:43:55 -07:00
committed by GitHub
parent d8c59ccc71
commit b46a872494
26 changed files with 2657 additions and 2816 deletions

View File

@@ -6,25 +6,12 @@
#include <algorithm>
#include <wpi/DenseMap.h>
#include <wpi/SmallVector.h>
#include "ntcore_c.h"
using namespace nt;
class ListenerStorage::Thread final : public wpi::SafeThreadEvent {
public:
explicit Thread(NT_ListenerPoller poller) : m_poller{poller} {}
void Main() final;
NT_ListenerPoller m_poller;
wpi::DenseMap<NT_Listener, ListenerCallback> m_callbacks;
wpi::Event m_waitQueueWakeup;
wpi::Event m_waitQueueWaiter;
};
void ListenerStorage::Thread::Main() {
while (m_active) {
WPI_Handle signaledBuf[3];
@@ -55,10 +42,6 @@ void ListenerStorage::Thread::Main() {
}
}
ListenerStorage::ListenerStorage(int inst) : m_inst{inst} {}
ListenerStorage::~ListenerStorage() = default;
void ListenerStorage::Activate(NT_Listener listenerHandle, unsigned int mask,
FinishEventFunc finishEvent) {
std::scoped_lock lock{m_mutex};

View File

@@ -11,6 +11,7 @@
#include <utility>
#include <vector>
#include <wpi/DenseMap.h>
#include <wpi/SafeThread.h>
#include <wpi/SmallVector.h>
#include <wpi/Synchronization.h>
@@ -19,16 +20,16 @@
#include "Handle.h"
#include "HandleMap.h"
#include "IListenerStorage.h"
#include "VectorSet.h"
#include "ntcore_cpp.h"
namespace nt {
class ListenerStorage final : public IListenerStorage {
public:
explicit ListenerStorage(int inst);
explicit ListenerStorage(int inst) : m_inst{inst} {}
ListenerStorage(const ListenerStorage&) = delete;
ListenerStorage& operator=(const ListenerStorage&) = delete;
~ListenerStorage() final;
// IListenerStorage interface
void Activate(NT_Listener listenerHandle, unsigned int mask,
@@ -97,21 +98,23 @@ class ListenerStorage final : public IListenerStorage {
};
HandleMap<ListenerData, 8> m_listeners;
// Utility wrapper for making a set-like vector
template <typename T>
class VectorSet : public std::vector<T> {
public:
void Add(T value) { this->push_back(value); }
void Remove(T value) { std::erase(*this, value); }
};
VectorSet<ListenerData*> m_connListeners;
VectorSet<ListenerData*> m_topicListeners;
VectorSet<ListenerData*> m_valueListeners;
VectorSet<ListenerData*> m_logListeners;
VectorSet<ListenerData*> m_timeSyncListeners;
class Thread;
class Thread final : public wpi::SafeThreadEvent {
public:
explicit Thread(NT_ListenerPoller poller) : m_poller{poller} {}
void Main() final;
NT_ListenerPoller m_poller;
wpi::DenseMap<NT_Listener, ListenerCallback> m_callbacks;
wpi::Event m_waitQueueWakeup;
wpi::Event m_waitQueueWaiter;
};
wpi::SafeThreadOwner<Thread> m_thread;
};

File diff suppressed because it is too large Load Diff

View File

@@ -14,8 +14,18 @@
#include <utility>
#include <vector>
#include <wpi/DenseMap.h>
#include <wpi/StringMap.h>
#include <wpi/Synchronization.h>
#include <wpi/json.h>
#include <wpi/mutex.h>
#include "Handle.h"
#include "HandleMap.h"
#include "PubSubOptions.h"
#include "Types_internal.h"
#include "ValueCircularBuffer.h"
#include "VectorSet.h"
#include "net/NetworkInterface.h"
#include "ntcore_cpp.h"
@@ -29,8 +39,8 @@ class IListenerStorage;
class LocalStorage final : public net::ILocalStorage {
public:
LocalStorage(int inst, IListenerStorage& listenerStorage,
wpi::Logger& logger);
LocalStorage(int inst, IListenerStorage& listenerStorage, wpi::Logger& logger)
: m_impl{inst, listenerStorage, logger} {}
LocalStorage(const LocalStorage&) = delete;
LocalStorage& operator=(const LocalStorage&) = delete;
~LocalStorage() final;
@@ -59,47 +69,129 @@ class LocalStorage final : public net::ILocalStorage {
std::vector<TopicInfo> GetTopicInfo(std::string_view prefix,
std::span<const std::string_view> types);
NT_Topic GetTopic(std::string_view name);
NT_Topic GetTopic(std::string_view name) {
if (name.empty()) {
return {};
}
std::scoped_lock lock{m_mutex};
return m_impl.GetOrCreateTopic(name)->handle;
}
std::string GetTopicName(NT_Topic topic);
std::string GetTopicName(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return topic->name;
} else {
return {};
}
}
NT_Type GetTopicType(NT_Topic topic);
NT_Type GetTopicType(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return topic->type;
} else {
return {};
}
}
std::string GetTopicTypeString(NT_Topic topic);
std::string GetTopicTypeString(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return topic->typeStr;
} else {
return {};
}
}
void SetTopicPersistent(NT_Topic topic, bool value);
void SetTopicPersistent(NT_Topic topicHandle, bool value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
m_impl.SetPersistent(topic, value);
}
}
bool GetTopicPersistent(NT_Topic topic);
bool GetTopicPersistent(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return (topic->flags & NT_PERSISTENT) != 0;
} else {
return false;
}
}
void SetTopicRetained(NT_Topic topic, bool value);
void SetTopicRetained(NT_Topic topicHandle, bool value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
m_impl.SetRetained(topic, value);
}
}
bool GetTopicRetained(NT_Topic topic);
bool GetTopicRetained(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return (topic->flags & NT_RETAINED) != 0;
} else {
return false;
}
}
bool GetTopicExists(NT_Handle handle);
bool GetTopicExists(NT_Handle handle) {
std::scoped_lock lock{m_mutex};
TopicData* topic = m_impl.GetTopic(handle);
return topic && topic->Exists();
}
wpi::json GetTopicProperty(NT_Topic topic, std::string_view name);
wpi::json GetTopicProperty(NT_Topic topicHandle, std::string_view name) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return topic->properties.value(name, wpi::json{});
} else {
return {};
}
}
void SetTopicProperty(NT_Topic topic, std::string_view name,
const wpi::json& value);
void DeleteTopicProperty(NT_Topic topic, std::string_view name);
wpi::json GetTopicProperties(NT_Topic topic);
wpi::json GetTopicProperties(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return topic->properties;
} else {
return wpi::json::object();
}
}
bool SetTopicProperties(NT_Topic topic, const wpi::json& update);
TopicInfo GetTopicInfo(NT_Topic topic);
TopicInfo GetTopicInfo(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return topic->GetTopicInfo();
} else {
return {};
}
}
NT_Subscriber Subscribe(NT_Topic topic, NT_Type type,
std::string_view typeStr,
const PubSubOptions& options);
void Unsubscribe(NT_Subscriber sub);
void Unsubscribe(NT_Subscriber subHandle) {
std::scoped_lock lock{m_mutex};
m_impl.RemoveSubEntry(subHandle);
}
NT_MultiSubscriber SubscribeMultiple(
std::span<const std::string_view> prefixes, const PubSubOptions& options);
void UnsubscribeMultiple(NT_MultiSubscriber subHandle);
void UnsubscribeMultiple(NT_MultiSubscriber subHandle) {
std::scoped_lock lock{m_mutex};
m_impl.RemoveMultiSubscriber(subHandle);
}
NT_Publisher Publish(NT_Topic topic, NT_Type type, std::string_view typeStr,
const wpi::json& properties,
@@ -110,84 +202,106 @@ class LocalStorage final : public net::ILocalStorage {
NT_Entry GetEntry(NT_Topic topic, NT_Type type, std::string_view typeStr,
const PubSubOptions& options);
void ReleaseEntry(NT_Entry entry);
void ReleaseEntry(NT_Entry entryHandle) {
std::scoped_lock lock{m_mutex};
m_impl.RemoveSubEntry(entryHandle);
}
void Release(NT_Handle pubsubentry);
NT_Topic GetTopicFromHandle(NT_Handle pubsubentry);
NT_Topic GetTopicFromHandle(NT_Handle pubsubentryHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.GetTopic(pubsubentryHandle)) {
return topic->handle;
} else {
return {};
}
}
bool SetEntryValue(NT_Handle pubentry, const Value& value);
bool SetEntryValue(NT_Handle pubentryHandle, const Value& value) {
std::scoped_lock lock{m_mutex};
return m_impl.SetEntryValue(pubentryHandle, value);
}
bool SetDefaultEntryValue(NT_Handle pubsubentry, const Value& value);
bool SetDefaultEntryValue(NT_Handle pubsubentryHandle, const Value& value) {
std::scoped_lock lock{m_mutex};
return m_impl.SetDefaultEntryValue(pubsubentryHandle, value);
}
TimestampedBoolean GetAtomicBoolean(NT_Handle subentry, bool defaultValue);
TimestampedInteger GetAtomicInteger(NT_Handle subentry, int64_t defaultValue);
TimestampedFloat GetAtomicFloat(NT_Handle subentry, float defaultValue);
TimestampedDouble GetAtomicDouble(NT_Handle subentry, double defaultValue);
TimestampedString GetAtomicString(NT_Handle subentry,
std::string_view defaultValue);
TimestampedRaw GetAtomicRaw(NT_Handle subentry,
std::span<const uint8_t> defaultValue);
TimestampedBooleanArray GetAtomicBooleanArray(
NT_Handle subentry, std::span<const int> defaultValue);
TimestampedIntegerArray GetAtomicIntegerArray(
NT_Handle subentry, std::span<const int64_t> defaultValue);
TimestampedFloatArray GetAtomicFloatArray(
NT_Handle subentry, std::span<const float> defaultValue);
TimestampedDoubleArray GetAtomicDoubleArray(
NT_Handle subentry, std::span<const double> defaultValue);
TimestampedStringArray GetAtomicStringArray(
NT_Handle subentry, std::span<const std::string> defaultValue);
template <ValidType T>
Timestamped<typename TypeInfo<T>::Value> GetAtomic(
NT_Handle subentry, typename TypeInfo<T>::View defaultValue);
TimestampedStringView GetAtomicString(NT_Handle subentry,
wpi::SmallVectorImpl<char>& buf,
std::string_view defaultValue);
TimestampedRawView GetAtomicRaw(NT_Handle subentry,
wpi::SmallVectorImpl<uint8_t>& buf,
std::span<const uint8_t> defaultValue);
TimestampedBooleanArrayView GetAtomicBooleanArray(
NT_Handle subentry, wpi::SmallVectorImpl<int>& buf,
std::span<const int> defaultValue);
TimestampedIntegerArrayView GetAtomicIntegerArray(
NT_Handle subentry, wpi::SmallVectorImpl<int64_t>& buf,
std::span<const int64_t> defaultValue);
TimestampedFloatArrayView GetAtomicFloatArray(
NT_Handle subentry, wpi::SmallVectorImpl<float>& buf,
std::span<const float> defaultValue);
TimestampedDoubleArrayView GetAtomicDoubleArray(
NT_Handle subentry, wpi::SmallVectorImpl<double>& buf,
std::span<const double> defaultValue);
template <SmallArrayType T>
Timestamped<typename TypeInfo<T>::SmallRet> GetAtomic(
NT_Handle subentry,
wpi::SmallVectorImpl<typename TypeInfo<T>::SmallElem>& buf,
typename TypeInfo<T>::View defaultValue);
std::vector<Value> ReadQueueValue(NT_Handle subentry);
std::vector<Value> ReadQueueValue(NT_Handle subentry) {
std::scoped_lock lock{m_mutex};
auto subscriber = m_impl.GetSubEntry(subentry);
if (!subscriber) {
return {};
}
return subscriber->pollStorage.ReadValue();
}
std::vector<TimestampedBoolean> ReadQueueBoolean(NT_Handle subentry);
std::vector<TimestampedInteger> ReadQueueInteger(NT_Handle subentry);
std::vector<TimestampedFloat> ReadQueueFloat(NT_Handle subentry);
std::vector<TimestampedDouble> ReadQueueDouble(NT_Handle subentry);
std::vector<TimestampedString> ReadQueueString(NT_Handle subentry);
std::vector<TimestampedRaw> ReadQueueRaw(NT_Handle subentry);
std::vector<TimestampedBooleanArray> ReadQueueBooleanArray(
template <ValidType T>
std::vector<Timestamped<typename TypeInfo<T>::Value>> ReadQueue(
NT_Handle subentry);
std::vector<TimestampedIntegerArray> ReadQueueIntegerArray(
NT_Handle subentry);
std::vector<TimestampedFloatArray> ReadQueueFloatArray(NT_Handle subentry);
std::vector<TimestampedDoubleArray> ReadQueueDoubleArray(NT_Handle subentry);
std::vector<TimestampedStringArray> ReadQueueStringArray(NT_Handle subentry);
//
// Backwards compatible user functions
//
Value GetEntryValue(NT_Handle subentry);
void SetEntryFlags(NT_Entry entry, unsigned int flags);
unsigned int GetEntryFlags(NT_Entry entry);
void SetEntryFlags(NT_Entry entryHandle, unsigned int flags) {
std::scoped_lock lock{m_mutex};
if (auto entry = m_impl.m_entries.Get(entryHandle)) {
m_impl.SetFlags(entry->subscriber->topic, flags);
}
}
unsigned int GetEntryFlags(NT_Entry entryHandle) {
std::scoped_lock lock{m_mutex};
if (auto entry = m_impl.m_entries.Get(entryHandle)) {
return entry->subscriber->topic->flags;
} else {
return 0;
}
}
// Index-only
NT_Entry GetEntry(std::string_view name);
std::string GetEntryName(NT_Entry entry);
NT_Type GetEntryType(NT_Entry entry);
int64_t GetEntryLastChange(NT_Entry entry);
std::string GetEntryName(NT_Entry subentryHandle) {
std::scoped_lock lock{m_mutex};
if (auto subscriber = m_impl.GetSubEntry(subentryHandle)) {
return subscriber->topic->name;
} else {
return {};
}
}
NT_Type GetEntryType(NT_Entry subentryHandle) {
std::scoped_lock lock{m_mutex};
if (auto subscriber = m_impl.GetSubEntry(subentryHandle)) {
return subscriber->topic->type;
} else {
return {};
}
}
int64_t GetEntryLastChange(NT_Entry subentryHandle) {
std::scoped_lock lock{m_mutex};
if (auto subscriber = m_impl.GetSubEntry(subentryHandle)) {
return subscriber->topic->lastValue.time();
} else {
return 0;
}
}
//
// Listener functions
@@ -210,10 +324,352 @@ class LocalStorage final : public net::ILocalStorage {
void Reset();
private:
class Impl;
std::unique_ptr<Impl> m_impl;
static constexpr bool IsSpecial(std::string_view name) {
return name.empty() ? false : name.front() == '$';
}
struct EntryData;
struct PublisherData;
struct SubscriberData;
struct MultiSubscriberData;
struct DataLoggerEntry {
DataLoggerEntry(wpi::log::DataLog& log, int entry, NT_DataLogger logger)
: log{&log}, entry{entry}, logger{logger} {}
static std::string MakeMetadata(std::string_view properties);
void Append(const Value& v);
wpi::log::DataLog* log;
int entry;
NT_DataLogger logger;
};
struct TopicData {
static constexpr auto kType = Handle::kTopic;
TopicData(NT_Topic handle, std::string_view name)
: handle{handle}, name{name}, special{IsSpecial(name)} {}
bool Exists() const { return onNetwork || !localPublishers.empty(); }
TopicInfo GetTopicInfo() const;
// invariants
wpi::SignalObject<NT_Topic> handle;
std::string name;
bool special;
Value lastValue; // also stores timestamp
Value lastValueNetwork;
NT_Type type{NT_UNASSIGNED};
std::string typeStr;
unsigned int flags{0}; // for NT3 APIs
std::string propertiesStr{"{}"}; // cached string for GetTopicInfo() et al
wpi::json properties = wpi::json::object();
NT_Entry entry{0}; // cached entry for GetEntry()
bool onNetwork{false}; // true if there are any remote publishers
bool lastValueFromNetwork{false};
wpi::SmallVector<DataLoggerEntry, 1> datalogs;
NT_Type datalogType{NT_UNASSIGNED};
VectorSet<PublisherData*> localPublishers;
VectorSet<SubscriberData*> localSubscribers;
VectorSet<MultiSubscriberData*> multiSubscribers;
VectorSet<EntryData*> entries;
VectorSet<NT_Listener> listeners;
};
struct PubSubConfig : public PubSubOptionsImpl {
PubSubConfig() = default;
PubSubConfig(NT_Type type, std::string_view typeStr,
const PubSubOptions& options)
: PubSubOptionsImpl{options}, type{type}, typeStr{typeStr} {
prefixMatch = false;
}
NT_Type type{NT_UNASSIGNED};
std::string typeStr;
};
struct PublisherData {
static constexpr auto kType = Handle::kPublisher;
PublisherData(NT_Publisher handle, TopicData* topic, PubSubConfig config)
: handle{handle}, topic{topic}, config{std::move(config)} {}
void UpdateActive() {
active = config.type == topic->type && config.typeStr == topic->typeStr;
}
// invariants
wpi::SignalObject<NT_Publisher> handle;
TopicData* topic;
PubSubConfig config;
// whether or not the publisher should actually publish values
bool active{false};
};
struct SubscriberData {
static constexpr auto kType = Handle::kSubscriber;
SubscriberData(NT_Subscriber handle, TopicData* topic, PubSubConfig config)
: handle{handle},
topic{topic},
config{std::move(config)},
pollStorage{config.pollStorage} {}
void UpdateActive() {
// for subscribers, unassigned is a wildcard
// also allow numerically compatible subscribers
active =
config.type == NT_UNASSIGNED ||
(config.type == topic->type && config.typeStr == topic->typeStr) ||
IsNumericCompatible(config.type, topic->type);
}
// invariants
wpi::SignalObject<NT_Subscriber> handle;
TopicData* topic;
PubSubConfig config;
// whether or not the subscriber should actually receive values
bool active{false};
// polling storage
ValueCircularBuffer pollStorage;
// value listeners
VectorSet<NT_Listener> valueListeners;
};
struct EntryData {
static constexpr auto kType = Handle::kEntry;
EntryData(NT_Entry handle, SubscriberData* subscriber)
: handle{handle}, topic{subscriber->topic}, subscriber{subscriber} {}
// invariants
wpi::SignalObject<NT_Entry> handle;
TopicData* topic;
SubscriberData* subscriber;
// the publisher (created on demand)
PublisherData* publisher{nullptr};
};
struct MultiSubscriberData {
static constexpr auto kType = Handle::kMultiSubscriber;
MultiSubscriberData(NT_MultiSubscriber handle,
std::span<const std::string_view> prefixes,
const PubSubOptionsImpl& options)
: handle{handle}, options{options} {
this->options.prefixMatch = true;
this->prefixes.reserve(prefixes.size());
for (auto&& prefix : prefixes) {
this->prefixes.emplace_back(prefix);
}
}
bool Matches(std::string_view name, bool special);
// invariants
wpi::SignalObject<NT_MultiSubscriber> handle;
std::vector<std::string> prefixes;
PubSubOptionsImpl options;
// value listeners
VectorSet<NT_Listener> valueListeners;
};
struct ListenerData {
ListenerData(NT_Listener handle, SubscriberData* subscriber,
unsigned int eventMask, bool subscriberOwned)
: handle{handle},
eventMask{eventMask},
subscriber{subscriber},
subscriberOwned{subscriberOwned} {}
ListenerData(NT_Listener handle, MultiSubscriberData* subscriber,
unsigned int eventMask, bool subscriberOwned)
: handle{handle},
eventMask{eventMask},
multiSubscriber{subscriber},
subscriberOwned{subscriberOwned} {}
NT_Listener handle;
unsigned int eventMask;
SubscriberData* subscriber{nullptr};
MultiSubscriberData* multiSubscriber{nullptr};
bool subscriberOwned;
};
struct DataLoggerData {
static constexpr auto kType = Handle::kDataLogger;
DataLoggerData(NT_DataLogger handle, wpi::log::DataLog& log,
std::string_view prefix, std::string_view logPrefix)
: handle{handle}, log{log}, prefix{prefix}, logPrefix{logPrefix} {}
int Start(TopicData* topic, int64_t time);
NT_DataLogger handle;
wpi::log::DataLog& log;
std::string prefix;
std::string logPrefix;
};
// inner struct to protect against accidentally deadlocking on the mutex
struct Impl {
Impl(int inst, IListenerStorage& listenerStorage, wpi::Logger& logger);
int m_inst;
IListenerStorage& m_listenerStorage;
wpi::Logger& m_logger;
net::NetworkInterface* m_network{nullptr};
// handle mappings
HandleMap<TopicData, 16> m_topics;
HandleMap<PublisherData, 16> m_publishers;
HandleMap<SubscriberData, 16> m_subscribers;
HandleMap<EntryData, 16> m_entries;
HandleMap<MultiSubscriberData, 16> m_multiSubscribers;
HandleMap<DataLoggerData, 16> m_dataloggers;
// name mappings
wpi::StringMap<TopicData*> m_nameTopics;
// listeners
wpi::DenseMap<NT_Listener, std::unique_ptr<ListenerData>> m_listeners;
// string-based listeners
VectorSet<ListenerData*> m_topicPrefixListeners;
// topic functions
void NotifyTopic(TopicData* topic, unsigned int eventFlags);
void CheckReset(TopicData* topic);
bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags,
bool isDuplicate, bool suppressIfDuplicate,
const PublisherData* publisher);
void NotifyValue(TopicData* topic, unsigned int eventFlags,
bool isDuplicate, const PublisherData* publisher);
void SetFlags(TopicData* topic, unsigned int flags);
void SetPersistent(TopicData* topic, bool value);
void SetRetained(TopicData* topic, bool value);
void SetProperties(TopicData* topic, const wpi::json& update,
bool sendNetwork);
void PropertiesUpdated(TopicData* topic, const wpi::json& update,
unsigned int eventFlags, bool sendNetwork,
bool updateFlags = true);
void RefreshPubSubActive(TopicData* topic, bool warnOnSubMismatch);
void NetworkAnnounce(TopicData* topic, std::string_view typeStr,
const wpi::json& properties, NT_Publisher pubHandle);
void RemoveNetworkPublisher(TopicData* topic);
void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack);
void StartNetwork(net::NetworkInterface* network);
PublisherData* AddLocalPublisher(TopicData* topic,
const wpi::json& properties,
const PubSubConfig& options);
std::unique_ptr<PublisherData> RemoveLocalPublisher(NT_Publisher pubHandle);
SubscriberData* AddLocalSubscriber(TopicData* topic,
const PubSubConfig& options);
std::unique_ptr<SubscriberData> RemoveLocalSubscriber(
NT_Subscriber subHandle);
EntryData* AddEntry(SubscriberData* subscriber);
std::unique_ptr<EntryData> RemoveEntry(NT_Entry entryHandle);
MultiSubscriberData* AddMultiSubscriber(
std::span<const std::string_view> prefixes,
const PubSubOptions& options);
std::unique_ptr<MultiSubscriberData> RemoveMultiSubscriber(
NT_MultiSubscriber subHandle);
void AddListenerImpl(NT_Listener listenerHandle, TopicData* topic,
unsigned int eventMask);
void AddListenerImpl(NT_Listener listenerHandle, SubscriberData* subscriber,
unsigned int eventMask, NT_Handle subentryHandle,
bool subscriberOwned);
void AddListenerImpl(NT_Listener listenerHandle,
MultiSubscriberData* subscriber,
unsigned int eventMask, bool subscriberOwned);
void AddListenerImpl(NT_Listener listenerHandle,
std::span<const std::string_view> prefixes,
unsigned int eventMask);
TopicData* GetOrCreateTopic(std::string_view name);
TopicData* GetTopic(NT_Handle handle);
SubscriberData* GetSubEntry(NT_Handle subentryHandle);
PublisherData* PublishEntry(EntryData* entry, NT_Type type);
Value* GetSubEntryValue(NT_Handle subentryHandle) {
if (auto subscriber = GetSubEntry(subentryHandle)) {
return &subscriber->topic->lastValue;
} else {
return nullptr;
}
}
bool PublishLocalValue(PublisherData* publisher, const Value& value,
bool force = false);
bool SetEntryValue(NT_Handle pubentryHandle, const Value& value);
bool SetDefaultEntryValue(NT_Handle pubsubentryHandle, const Value& value);
void RemoveSubEntry(NT_Handle subentryHandle);
};
wpi::mutex m_mutex;
Impl m_impl;
};
template <ValidType T>
Timestamped<typename TypeInfo<T>::Value> LocalStorage::GetAtomic(
NT_Handle subentry, typename TypeInfo<T>::View defaultValue) {
std::scoped_lock lock{m_mutex};
Value* value = m_impl.GetSubEntryValue(subentry);
if (value && (IsNumericConvertibleTo<T>(*value) || IsType<T>(*value))) {
return GetTimestamped<T, true>(*value);
} else {
return {0, 0, CopyValue<T>(defaultValue)};
}
}
template <SmallArrayType T>
Timestamped<typename TypeInfo<T>::SmallRet> LocalStorage::GetAtomic(
NT_Handle subentry,
wpi::SmallVectorImpl<typename TypeInfo<T>::SmallElem>& buf,
typename TypeInfo<T>::View defaultValue) {
std::scoped_lock lock{m_mutex};
Value* value = m_impl.GetSubEntryValue(subentry);
if (value && (IsNumericConvertibleTo<T>(*value) || IsType<T>(*value))) {
return GetTimestamped<T, true>(*value, buf);
} else {
return {0, 0, CopyValue<T>(defaultValue, buf)};
}
}
template <ValidType T>
std::vector<Timestamped<typename TypeInfo<T>::Value>> LocalStorage::ReadQueue(
NT_Handle subentry) {
std::scoped_lock lock{m_mutex};
auto subscriber = m_impl.GetSubEntry(subentry);
if (!subscriber) {
return {};
}
return subscriber->pollStorage.Read<T>();
}
} // namespace nt

View File

@@ -9,10 +9,8 @@
#define LOG(level, format, ...) \
WPI_LOG(m_logger, level, format __VA_OPT__(, ) __VA_ARGS__)
#undef ERROR
#define ERROR(format, ...) \
WPI_ERROR(m_logger, format __VA_OPT__(, ) __VA_ARGS__)
#define WARNING(format, ...) \
#define ERR(format, ...) WPI_ERROR(m_logger, format __VA_OPT__(, ) __VA_ARGS__)
#define WARN(format, ...) \
WPI_WARNING(m_logger, format __VA_OPT__(, ) __VA_ARGS__)
#define INFO(format, ...) WPI_INFO(m_logger, format __VA_OPT__(, ) __VA_ARGS__)

View File

@@ -13,25 +13,13 @@
#include <fmt/format.h>
#include <wpi/SmallString.h>
#include <wpi/StringExtras.h>
#include <wpinet/DsClient.h>
#include <wpinet/EventLoopRunner.h>
#include <wpinet/HttpUtil.h>
#include <wpinet/ParallelTcpConnector.h>
#include <wpinet/WebSocket.h>
#include <wpinet/uv/Async.h>
#include <wpinet/uv/Loop.h>
#include <wpinet/uv/Tcp.h>
#include <wpinet/uv/Timer.h>
#include <wpinet/uv/util.h>
#include "IConnectionList.h"
#include "Log.h"
#include "net/ClientImpl.h"
#include "net/Message.h"
#include "net/NetworkLoopQueue.h"
#include "net/WebSocketConnection.h"
#include "net3/ClientImpl3.h"
#include "net3/UvStreamConnection3.h"
using namespace nt;
namespace uv = wpi::uv;
@@ -41,97 +29,10 @@ static constexpr uv::Timer::Time kWebsocketHandshakeTimeout{500};
// use a larger max message size for websockets
static constexpr size_t kMaxMessageSize = 2 * 1024 * 1024;
namespace {
class NCImpl {
public:
NCImpl(int inst, std::string_view id, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger);
virtual ~NCImpl() = default;
// user-facing functions
void SetServers(std::span<const std::pair<std::string, unsigned int>> servers,
unsigned int defaultPort);
void StartDSClient(unsigned int port);
void StopDSClient();
virtual void TcpConnected(uv::Tcp& tcp) = 0;
virtual void ForceDisconnect(std::string_view reason) = 0;
virtual void Disconnect(std::string_view reason);
// invariants
int m_inst;
net::ILocalStorage& m_localStorage;
IConnectionList& m_connList;
wpi::Logger& m_logger;
std::string m_id;
// used only from loop
std::shared_ptr<wpi::ParallelTcpConnector> m_parallelConnect;
std::shared_ptr<uv::Timer> m_readLocalTimer;
std::shared_ptr<uv::Timer> m_sendValuesTimer;
std::shared_ptr<uv::Async<>> m_flushLocal;
std::shared_ptr<uv::Async<>> m_flush;
std::vector<net::ClientMessage> m_localMsgs;
std::vector<std::pair<std::string, unsigned int>> m_servers;
std::pair<std::string, unsigned int> m_dsClientServer{"", 0};
std::shared_ptr<wpi::DsClient> m_dsClient;
// shared with user
std::atomic<uv::Async<>*> m_flushLocalAtomic{nullptr};
std::atomic<uv::Async<>*> m_flushAtomic{nullptr};
net::NetworkLoopQueue m_localQueue;
int m_connHandle = 0;
wpi::EventLoopRunner m_loopRunner;
uv::Loop& m_loop;
};
class NCImpl3 : public NCImpl {
public:
NCImpl3(int inst, std::string_view id, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger);
~NCImpl3() override;
void HandleLocal();
void TcpConnected(uv::Tcp& tcp) final;
void ForceDisconnect(std::string_view reason) override;
void Disconnect(std::string_view reason) override;
std::shared_ptr<net3::UvStreamConnection3> m_wire;
std::shared_ptr<net3::ClientImpl3> m_clientImpl;
};
class NCImpl4 : public NCImpl {
public:
NCImpl4(
int inst, std::string_view id, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated);
~NCImpl4() override;
void HandleLocal();
void TcpConnected(uv::Tcp& tcp) final;
void WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp);
void ForceDisconnect(std::string_view reason) override;
void Disconnect(std::string_view reason) override;
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
m_timeSyncUpdated;
std::shared_ptr<net::WebSocketConnection> m_wire;
std::unique_ptr<net::ClientImpl> m_clientImpl;
};
} // namespace
NCImpl::NCImpl(int inst, std::string_view id, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger)
NetworkClientBase::NetworkClientBase(int inst, std::string_view id,
net::ILocalStorage& localStorage,
IConnectionList& connList,
wpi::Logger& logger)
: m_inst{inst},
m_localStorage{localStorage},
m_connList{connList},
@@ -144,28 +45,17 @@ NCImpl::NCImpl(int inst, std::string_view id, net::ILocalStorage& localStorage,
INFO("starting network client");
}
void NCImpl::SetServers(
std::span<const std::pair<std::string, unsigned int>> servers,
unsigned int defaultPort) {
std::vector<std::pair<std::string, unsigned int>> serversCopy;
serversCopy.reserve(servers.size());
for (auto&& server : servers) {
serversCopy.emplace_back(wpi::trim(server.first),
server.second == 0 ? defaultPort : server.second);
}
m_loopRunner.ExecAsync(
[this, servers = std::move(serversCopy)](uv::Loop&) mutable {
m_servers = std::move(servers);
if (m_dsClientServer.first.empty()) {
if (m_parallelConnect) {
m_parallelConnect->SetServers(m_servers);
}
}
});
NetworkClientBase::~NetworkClientBase() {
m_localStorage.ClearNetwork();
m_connList.ClearConnections();
}
void NCImpl::StartDSClient(unsigned int port) {
void NetworkClientBase::Disconnect() {
m_loopRunner.ExecAsync(
[this](auto&) { ForceDisconnect("requested by application"); });
}
void NetworkClientBase::StartDSClient(unsigned int port) {
m_loopRunner.ExecAsync([=, this](uv::Loop& loop) {
if (m_dsClient) {
return;
@@ -189,7 +79,7 @@ void NCImpl::StartDSClient(unsigned int port) {
});
}
void NCImpl::StopDSClient() {
void NetworkClientBase::StopDSClient() {
m_loopRunner.ExecAsync([this](uv::Loop& loop) {
if (m_dsClient) {
m_dsClient->Close();
@@ -198,7 +88,40 @@ void NCImpl::StopDSClient() {
});
}
void NCImpl::Disconnect(std::string_view reason) {
void NetworkClientBase::FlushLocal() {
if (auto async = m_flushLocalAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
void NetworkClientBase::Flush() {
if (auto async = m_flushAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
void NetworkClientBase::DoSetServers(
std::span<const std::pair<std::string, unsigned int>> servers,
unsigned int defaultPort) {
std::vector<std::pair<std::string, unsigned int>> serversCopy;
serversCopy.reserve(servers.size());
for (auto&& server : servers) {
serversCopy.emplace_back(wpi::trim(server.first),
server.second == 0 ? defaultPort : server.second);
}
m_loopRunner.ExecAsync(
[this, servers = std::move(serversCopy)](uv::Loop&) mutable {
m_servers = std::move(servers);
if (m_dsClientServer.first.empty()) {
if (m_parallelConnect) {
m_parallelConnect->SetServers(m_servers);
}
}
});
}
void NetworkClientBase::DoDisconnect(std::string_view reason) {
if (m_readLocalTimer) {
m_readLocalTimer->Stop();
}
@@ -218,10 +141,10 @@ void NCImpl::Disconnect(std::string_view reason) {
});
}
NCImpl3::NCImpl3(int inst, std::string_view id,
net::ILocalStorage& localStorage, IConnectionList& connList,
wpi::Logger& logger)
: NCImpl{inst, id, localStorage, connList, logger} {
NetworkClient3::NetworkClient3(int inst, std::string_view id,
net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger)
: NetworkClientBase{inst, id, localStorage, connList, logger} {
m_loopRunner.ExecAsync([this](uv::Loop& loop) {
m_parallelConnect = wpi::ParallelTcpConnector::Create(
loop, kReconnectRate, m_logger,
@@ -257,7 +180,7 @@ NCImpl3::NCImpl3(int inst, std::string_view id,
});
}
NCImpl3::~NCImpl3() {
NetworkClient3::~NetworkClient3() {
// must explicitly destroy these on loop
m_loopRunner.ExecSync([&](auto&) {
m_clientImpl.reset();
@@ -267,14 +190,14 @@ NCImpl3::~NCImpl3() {
m_loopRunner.Stop();
}
void NCImpl3::HandleLocal() {
void NetworkClient3::HandleLocal() {
m_localQueue.ReadQueue(&m_localMsgs);
if (m_clientImpl) {
m_clientImpl->HandleLocal(m_localMsgs);
}
}
void NCImpl3::TcpConnected(uv::Tcp& tcp) {
void NetworkClient3::TcpConnected(uv::Tcp& tcp) {
tcp.SetNoDelay(true);
// create as shared_ptr and capture in lambda because there may be multiple
@@ -319,19 +242,19 @@ void NCImpl3::TcpConnected(uv::Tcp& tcp) {
tcp.error.connect([this, &tcp](uv::Error err) {
DEBUG3("NT3 TCP error {}", err.str());
if (!tcp.IsLoopClosing()) {
Disconnect(err.str());
DoDisconnect(err.str());
}
});
tcp.end.connect([this, &tcp] {
DEBUG3("NT3 TCP read ended");
if (!tcp.IsLoopClosing()) {
Disconnect("remote end closed connection");
DoDisconnect("remote end closed connection");
}
});
tcp.closed.connect([this, &tcp] {
DEBUG3("NT3 TCP connection closed");
if (!tcp.IsLoopClosing()) {
Disconnect(m_wire ? m_wire->GetDisconnectReason() : "unknown");
DoDisconnect(m_wire ? m_wire->GetDisconnectReason() : "unknown");
}
});
@@ -349,25 +272,25 @@ void NCImpl3::TcpConnected(uv::Tcp& tcp) {
tcp.StartRead();
}
void NCImpl3::ForceDisconnect(std::string_view reason) {
void NetworkClient3::ForceDisconnect(std::string_view reason) {
if (m_wire) {
m_wire->Disconnect(reason);
}
}
void NCImpl3::Disconnect(std::string_view reason) {
void NetworkClient3::DoDisconnect(std::string_view reason) {
INFO("DISCONNECTED NT3 connection: {}", reason);
m_clientImpl.reset();
m_wire.reset();
NCImpl::Disconnect(reason);
NetworkClientBase::DoDisconnect(reason);
}
NCImpl4::NCImpl4(
NetworkClient::NetworkClient(
int inst, std::string_view id, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated)
: NCImpl{inst, id, localStorage, connList, logger},
: NetworkClientBase{inst, id, localStorage, connList, logger},
m_timeSyncUpdated{std::move(timeSyncUpdated)} {
m_loopRunner.ExecAsync([this](uv::Loop& loop) {
m_parallelConnect = wpi::ParallelTcpConnector::Create(
@@ -415,7 +338,7 @@ NCImpl4::NCImpl4(
});
}
NCImpl4::~NCImpl4() {
NetworkClient::~NetworkClient() {
// must explicitly destroy these on loop
m_loopRunner.ExecSync([&](auto&) {
m_clientImpl.reset();
@@ -425,14 +348,14 @@ NCImpl4::~NCImpl4() {
m_loopRunner.Stop();
}
void NCImpl4::HandleLocal() {
void NetworkClient::HandleLocal() {
m_localQueue.ReadQueue(&m_localMsgs);
if (m_clientImpl) {
m_clientImpl->HandleLocal(std::move(m_localMsgs));
}
}
void NCImpl4::TcpConnected(uv::Tcp& tcp) {
void NetworkClient::TcpConnected(uv::Tcp& tcp) {
tcp.SetNoDelay(true);
// Start the WS client
if (m_logger.min_level() >= wpi::WPI_LOG_DEBUG4) {
@@ -457,7 +380,7 @@ void NCImpl4::TcpConnected(uv::Tcp& tcp) {
});
}
void NCImpl4::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) {
void NetworkClient::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) {
if (m_parallelConnect) {
m_parallelConnect->Succeeded(tcp);
}
@@ -485,7 +408,7 @@ void NCImpl4::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) {
m_clientImpl->SendInitial();
ws.closed.connect([this, &ws](uint16_t, std::string_view reason) {
if (!ws.GetStream().IsLoopClosing()) {
Disconnect(reason);
DoDisconnect(reason);
}
});
ws.text.connect([this](std::string_view data, bool) {
@@ -500,13 +423,13 @@ void NCImpl4::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) {
});
}
void NCImpl4::ForceDisconnect(std::string_view reason) {
void NetworkClient::ForceDisconnect(std::string_view reason) {
if (m_wire) {
m_wire->Disconnect(reason);
}
}
void NCImpl4::Disconnect(std::string_view reason) {
void NetworkClient::DoDisconnect(std::string_view reason) {
std::string realReason;
if (m_wire) {
realReason = m_wire->GetDisconnectReason();
@@ -515,107 +438,6 @@ void NCImpl4::Disconnect(std::string_view reason) {
realReason.empty() ? reason : realReason);
m_clientImpl.reset();
m_wire.reset();
NCImpl::Disconnect(reason);
NetworkClientBase::DoDisconnect(reason);
m_timeSyncUpdated(0, 0, false);
}
class NetworkClient::Impl final : public NCImpl4 {
public:
Impl(int inst, std::string_view id, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated)
: NCImpl4{inst, id, localStorage,
connList, logger, std::move(timeSyncUpdated)} {}
};
NetworkClient::NetworkClient(
int inst, std::string_view id, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated)
: m_impl{std::make_unique<Impl>(inst, id, localStorage, connList, logger,
std::move(timeSyncUpdated))} {}
NetworkClient::~NetworkClient() {
m_impl->m_localStorage.ClearNetwork();
m_impl->m_connList.ClearConnections();
}
void NetworkClient::SetServers(
std::span<const std::pair<std::string, unsigned int>> servers) {
m_impl->SetServers(servers, NT_DEFAULT_PORT4);
}
void NetworkClient::Disconnect() {
m_impl->m_loopRunner.ExecAsync(
[this](auto&) { m_impl->ForceDisconnect("requested by application"); });
}
void NetworkClient::StartDSClient(unsigned int port) {
m_impl->StartDSClient(port);
}
void NetworkClient::StopDSClient() {
m_impl->StopDSClient();
}
void NetworkClient::FlushLocal() {
if (auto async = m_impl->m_flushLocalAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
void NetworkClient::Flush() {
if (auto async = m_impl->m_flushAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
class NetworkClient3::Impl final : public NCImpl3 {
public:
Impl(int inst, std::string_view id, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger)
: NCImpl3{inst, id, localStorage, connList, logger} {}
};
NetworkClient3::NetworkClient3(int inst, std::string_view id,
net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger)
: m_impl{std::make_unique<Impl>(inst, id, localStorage, connList, logger)} {
}
NetworkClient3::~NetworkClient3() {
m_impl->m_localStorage.ClearNetwork();
m_impl->m_connList.ClearConnections();
}
void NetworkClient3::SetServers(
std::span<const std::pair<std::string, unsigned int>> servers) {
m_impl->SetServers(servers, NT_DEFAULT_PORT3);
}
void NetworkClient3::Disconnect() {
m_impl->m_loopRunner.ExecAsync(
[this](auto&) { m_impl->ForceDisconnect("requested by application"); });
}
void NetworkClient3::StartDSClient(unsigned int port) {
m_impl->StartDSClient(port);
}
void NetworkClient3::StopDSClient() {
m_impl->StopDSClient();
}
void NetworkClient3::FlushLocal() {
if (auto async = m_impl->m_flushLocalAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
void NetworkClient3::Flush() {
if (auto async = m_impl->m_flushAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}

View File

@@ -4,6 +4,7 @@
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <optional>
@@ -11,8 +12,22 @@
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include <wpinet/DsClient.h>
#include <wpinet/EventLoopRunner.h>
#include <wpinet/ParallelTcpConnector.h>
#include <wpinet/WebSocket.h>
#include <wpinet/uv/Async.h>
#include <wpinet/uv/Timer.h>
#include "INetworkClient.h"
#include "net/ClientImpl.h"
#include "net/Message.h"
#include "net/NetworkLoopQueue.h"
#include "net/WebSocketConnection.h"
#include "net3/ClientImpl3.h"
#include "net3/UvStreamConnection3.h"
#include "ntcore_cpp.h"
namespace wpi {
@@ -27,7 +42,86 @@ namespace nt {
class IConnectionList;
class NetworkClient final : public INetworkClient {
class NetworkClientBase : public INetworkClient {
public:
NetworkClientBase(int inst, std::string_view id,
net::ILocalStorage& localStorage, IConnectionList& connList,
wpi::Logger& logger);
~NetworkClientBase() override;
void Disconnect() override;
void StartDSClient(unsigned int port) override;
void StopDSClient() override;
void FlushLocal() override;
void Flush() override;
protected:
void DoSetServers(
std::span<const std::pair<std::string, unsigned int>> servers,
unsigned int defaultPort);
virtual void TcpConnected(wpi::uv::Tcp& tcp) = 0;
virtual void ForceDisconnect(std::string_view reason) = 0;
virtual void DoDisconnect(std::string_view reason);
// invariants
int m_inst;
net::ILocalStorage& m_localStorage;
IConnectionList& m_connList;
wpi::Logger& m_logger;
std::string m_id;
// used only from loop
std::shared_ptr<wpi::ParallelTcpConnector> m_parallelConnect;
std::shared_ptr<wpi::uv::Timer> m_readLocalTimer;
std::shared_ptr<wpi::uv::Timer> m_sendValuesTimer;
std::shared_ptr<wpi::uv::Async<>> m_flushLocal;
std::shared_ptr<wpi::uv::Async<>> m_flush;
std::vector<net::ClientMessage> m_localMsgs;
std::vector<std::pair<std::string, unsigned int>> m_servers;
std::pair<std::string, unsigned int> m_dsClientServer{"", 0};
std::shared_ptr<wpi::DsClient> m_dsClient;
// shared with user
std::atomic<wpi::uv::Async<>*> m_flushLocalAtomic{nullptr};
std::atomic<wpi::uv::Async<>*> m_flushAtomic{nullptr};
net::NetworkLoopQueue m_localQueue;
int m_connHandle = 0;
wpi::EventLoopRunner m_loopRunner;
wpi::uv::Loop& m_loop;
};
class NetworkClient3 final : public NetworkClientBase {
public:
NetworkClient3(int inst, std::string_view id,
net::ILocalStorage& localStorage, IConnectionList& connList,
wpi::Logger& logger);
~NetworkClient3() final;
void SetServers(
std::span<const std::pair<std::string, unsigned int>> servers) final {
DoSetServers(servers, NT_DEFAULT_PORT3);
}
private:
void HandleLocal();
void TcpConnected(wpi::uv::Tcp& tcp) final;
void ForceDisconnect(std::string_view reason) override;
void DoDisconnect(std::string_view reason) override;
std::shared_ptr<net3::UvStreamConnection3> m_wire;
std::shared_ptr<net3::ClientImpl3> m_clientImpl;
};
class NetworkClient final : public NetworkClientBase {
public:
NetworkClient(
int inst, std::string_view id, net::ILocalStorage& localStorage,
@@ -37,40 +131,21 @@ class NetworkClient final : public INetworkClient {
~NetworkClient() final;
void SetServers(
std::span<const std::pair<std::string, unsigned int>> servers) final;
void Disconnect() final;
void StartDSClient(unsigned int port) final;
void StopDSClient() final;
void FlushLocal() final;
void Flush() final;
std::span<const std::pair<std::string, unsigned int>> servers) final {
DoSetServers(servers, NT_DEFAULT_PORT4);
}
private:
class Impl;
std::unique_ptr<Impl> m_impl;
};
void HandleLocal();
void TcpConnected(wpi::uv::Tcp& tcp) final;
void WsConnected(wpi::WebSocket& ws, wpi::uv::Tcp& tcp);
void ForceDisconnect(std::string_view reason) override;
void DoDisconnect(std::string_view reason) override;
class NetworkClient3 final : public INetworkClient {
public:
NetworkClient3(int inst, std::string_view id,
net::ILocalStorage& localStorage, IConnectionList& connList,
wpi::Logger& logger);
~NetworkClient3() final;
void SetServers(
std::span<const std::pair<std::string, unsigned int>> servers) final;
void Disconnect() final;
void StartDSClient(unsigned int port) final;
void StopDSClient() final;
void FlushLocal() final;
void Flush() final;
private:
class Impl;
std::unique_ptr<Impl> m_impl;
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
m_timeSyncUpdated;
std::shared_ptr<net::WebSocketConnection> m_wire;
std::unique_ptr<net::ClientImpl> m_clientImpl;
};
} // namespace nt

View File

@@ -17,11 +17,9 @@
#include <wpi/mutex.h>
#include <wpi/raw_istream.h>
#include <wpi/raw_ostream.h>
#include <wpinet/EventLoopRunner.h>
#include <wpinet/HttpUtil.h>
#include <wpinet/HttpWebSocketServerConnection.h>
#include <wpinet/UrlParser.h>
#include <wpinet/uv/Async.h>
#include <wpinet/uv/Tcp.h>
#include <wpinet/uv/Work.h>
#include <wpinet/uv/util.h>
@@ -29,9 +27,6 @@
#include "IConnectionList.h"
#include "InstanceImpl.h"
#include "Log.h"
#include "net/Message.h"
#include "net/NetworkLoopQueue.h"
#include "net/ServerImpl.h"
#include "net/WebSocketConnection.h"
#include "net3/UvStreamConnection3.h"
@@ -41,14 +36,10 @@ namespace uv = wpi::uv;
// use a larger max message size for websockets
static constexpr size_t kMaxMessageSize = 2 * 1024 * 1024;
namespace {
class NSImpl;
class ServerConnection {
class NetworkServer::ServerConnection {
public:
ServerConnection(NSImpl& server, std::string_view addr, unsigned int port,
wpi::Logger& logger)
ServerConnection(NetworkServer& server, std::string_view addr,
unsigned int port, wpi::Logger& logger)
: m_server{server},
m_connInfo{fmt::format("{}:{}", addr, port)},
m_logger{logger} {
@@ -63,7 +54,7 @@ class ServerConnection {
void UpdatePeriodicTimer(uint32_t repeatMs);
void ConnectionClosed();
NSImpl& m_server;
NetworkServer& m_server;
ConnectionInfo m_info;
std::string m_connInfo;
wpi::Logger& m_logger;
@@ -73,11 +64,21 @@ class ServerConnection {
std::shared_ptr<uv::Timer> m_sendValuesTimer;
};
class ServerConnection4 final
class NetworkServer::ServerConnection3 : public ServerConnection {
public:
ServerConnection3(std::shared_ptr<uv::Stream> stream, NetworkServer& server,
std::string_view addr, unsigned int port,
wpi::Logger& logger);
private:
std::shared_ptr<net3::UvStreamConnection3> m_wire;
};
class NetworkServer::ServerConnection4 final
: public ServerConnection,
public wpi::HttpWebSocketServerConnection<ServerConnection4> {
public:
ServerConnection4(std::shared_ptr<uv::Stream> stream, NSImpl& server,
ServerConnection4(std::shared_ptr<uv::Stream> stream, NetworkServer& server,
std::string_view addr, unsigned int port,
wpi::Logger& logger)
: ServerConnection{server, addr, port, logger},
@@ -92,71 +93,7 @@ class ServerConnection4 final
std::shared_ptr<net::WebSocketConnection> m_wire;
};
class ServerConnection3 : public ServerConnection {
public:
ServerConnection3(std::shared_ptr<uv::Stream> stream, NSImpl& server,
std::string_view addr, unsigned int port,
wpi::Logger& logger);
private:
std::shared_ptr<net3::UvStreamConnection3> m_wire;
};
class NSImpl {
public:
NSImpl(std::string_view persistFilename, std::string_view listenAddress,
unsigned int port3, unsigned int port4,
net::ILocalStorage& localStorage, IConnectionList& connList,
wpi::Logger& logger, std::function<void()> initDone);
~NSImpl();
void HandleLocal();
void LoadPersistent();
void SavePersistent(std::string_view filename, std::string_view data);
void Init();
void AddConnection(ServerConnection* conn, const ConnectionInfo& info);
void RemoveConnection(ServerConnection* conn);
net::ILocalStorage& m_localStorage;
IConnectionList& m_connList;
wpi::Logger& m_logger;
std::function<void()> m_initDone;
std::string m_persistentData;
std::string m_persistentFilename;
std::string m_listenAddress;
unsigned int m_port3;
unsigned int m_port4;
// used only from loop
std::shared_ptr<uv::Timer> m_readLocalTimer;
std::shared_ptr<uv::Timer> m_savePersistentTimer;
std::shared_ptr<uv::Async<>> m_flushLocal;
std::shared_ptr<uv::Async<>> m_flush;
bool m_shutdown = false;
std::vector<net::ClientMessage> m_localMsgs;
net::ServerImpl m_serverImpl;
// shared with user (must be atomic or mutex-protected)
std::atomic<uv::Async<>*> m_flushLocalAtomic{nullptr};
std::atomic<uv::Async<>*> m_flushAtomic{nullptr};
mutable wpi::mutex m_mutex;
struct Connection {
ServerConnection* conn;
int connHandle;
};
std::vector<Connection> m_connections;
net::NetworkLoopQueue m_localQueue;
wpi::EventLoopRunner m_loopRunner;
wpi::uv::Loop& m_loop;
};
} // namespace
void ServerConnection::SetupPeriodicTimer() {
void NetworkServer::ServerConnection::SetupPeriodicTimer() {
m_sendValuesTimer = uv::Timer::Create(m_server.m_loop);
m_sendValuesTimer->timeout.connect([this] {
m_server.HandleLocal();
@@ -164,7 +101,7 @@ void ServerConnection::SetupPeriodicTimer() {
});
}
void ServerConnection::UpdatePeriodicTimer(uint32_t repeatMs) {
void NetworkServer::ServerConnection::UpdatePeriodicTimer(uint32_t repeatMs) {
if (repeatMs == UINT32_MAX) {
m_sendValuesTimer->Stop();
} else {
@@ -173,7 +110,7 @@ void ServerConnection::UpdatePeriodicTimer(uint32_t repeatMs) {
}
}
void ServerConnection::ConnectionClosed() {
void NetworkServer::ServerConnection::ConnectionClosed() {
// don't call back into m_server if it's being destroyed
if (!m_sendValuesTimer->IsLoopClosing()) {
m_server.m_serverImpl.RemoveClient(m_clientId);
@@ -182,7 +119,54 @@ void ServerConnection::ConnectionClosed() {
m_sendValuesTimer->Close();
}
void ServerConnection4::ProcessRequest() {
NetworkServer::ServerConnection3::ServerConnection3(
std::shared_ptr<uv::Stream> stream, NetworkServer& server,
std::string_view addr, unsigned int port, wpi::Logger& logger)
: ServerConnection{server, addr, port, logger},
m_wire{std::make_shared<net3::UvStreamConnection3>(*stream)} {
m_info.remote_ip = addr;
m_info.remote_port = port;
// TODO: set local flag appropriately
m_clientId = m_server.m_serverImpl.AddClient3(
m_connInfo, false, *m_wire,
[this](std::string_view name, uint16_t proto) {
m_info.remote_id = name;
m_info.protocol_version = proto;
m_server.AddConnection(this, m_info);
INFO("CONNECTED NT3 client '{}' (from {})", name, m_connInfo);
},
[this](uint32_t repeatMs) { UpdatePeriodicTimer(repeatMs); });
stream->error.connect([this](uv::Error err) {
if (!m_wire->GetDisconnectReason().empty()) {
return;
}
m_wire->Disconnect(fmt::format("stream error: {}", err.name()));
m_wire->GetStream().Shutdown([this] { m_wire->GetStream().Close(); });
});
stream->end.connect([this] {
if (!m_wire->GetDisconnectReason().empty()) {
return;
}
m_wire->Disconnect("remote end closed connection");
m_wire->GetStream().Shutdown([this] { m_wire->GetStream().Close(); });
});
stream->closed.connect([this] {
INFO("DISCONNECTED NT3 client '{}' (from {}): {}", m_info.remote_id,
m_connInfo, m_wire->GetDisconnectReason());
ConnectionClosed();
});
stream->data.connect([this](uv::Buffer& buf, size_t size) {
m_server.m_serverImpl.ProcessIncomingBinary(
m_clientId, {reinterpret_cast<const uint8_t*>(buf.base), size});
});
stream->StartRead();
SetupPeriodicTimer();
}
void NetworkServer::ServerConnection4::ProcessRequest() {
DEBUG1("HTTP request: '{}'", m_request.GetUrl());
wpi::UrlParser url{m_request.GetUrl(),
m_request.GetMethod() == wpi::HTTP_CONNECT};
@@ -219,7 +203,7 @@ void ServerConnection4::ProcessRequest() {
}
}
void ServerConnection4::ProcessWsUpgrade() {
void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
// get name from URL
wpi::UrlParser url{m_request.GetUrl(), false};
std::string_view path;
@@ -271,58 +255,12 @@ void ServerConnection4::ProcessWsUpgrade() {
});
}
ServerConnection3::ServerConnection3(std::shared_ptr<uv::Stream> stream,
NSImpl& server, std::string_view addr,
unsigned int port, wpi::Logger& logger)
: ServerConnection{server, addr, port, logger},
m_wire{std::make_shared<net3::UvStreamConnection3>(*stream)} {
m_info.remote_ip = addr;
m_info.remote_port = port;
// TODO: set local flag appropriately
m_clientId = m_server.m_serverImpl.AddClient3(
m_connInfo, false, *m_wire,
[this](std::string_view name, uint16_t proto) {
m_info.remote_id = name;
m_info.protocol_version = proto;
m_server.AddConnection(this, m_info);
INFO("CONNECTED NT3 client '{}' (from {})", name, m_connInfo);
},
[this](uint32_t repeatMs) { UpdatePeriodicTimer(repeatMs); });
stream->error.connect([this](uv::Error err) {
if (!m_wire->GetDisconnectReason().empty()) {
return;
}
m_wire->Disconnect(fmt::format("stream error: {}", err.name()));
m_wire->GetStream().Shutdown([this] { m_wire->GetStream().Close(); });
});
stream->end.connect([this] {
if (!m_wire->GetDisconnectReason().empty()) {
return;
}
m_wire->Disconnect("remote end closed connection");
m_wire->GetStream().Shutdown([this] { m_wire->GetStream().Close(); });
});
stream->closed.connect([this] {
INFO("DISCONNECTED NT3 client '{}' (from {}): {}", m_info.remote_id,
m_connInfo, m_wire->GetDisconnectReason());
ConnectionClosed();
});
stream->data.connect([this](uv::Buffer& buf, size_t size) {
m_server.m_serverImpl.ProcessIncomingBinary(
m_clientId, {reinterpret_cast<const uint8_t*>(buf.base), size});
});
stream->StartRead();
SetupPeriodicTimer();
}
NSImpl::NSImpl(std::string_view persistentFilename,
std::string_view listenAddress, unsigned int port3,
unsigned int port4, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger,
std::function<void()> initDone)
NetworkServer::NetworkServer(std::string_view persistentFilename,
std::string_view listenAddress, unsigned int port3,
unsigned int port4,
net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger,
std::function<void()> initDone)
: m_localStorage{localStorage},
m_connList{connList},
m_logger{logger},
@@ -347,16 +285,30 @@ NSImpl::NSImpl(std::string_view persistentFilename,
});
}
NSImpl::~NSImpl() {
NetworkServer::~NetworkServer() {
m_loopRunner.ExecAsync([this](uv::Loop&) { m_shutdown = true; });
m_localStorage.ClearNetwork();
m_connList.ClearConnections();
}
void NSImpl::HandleLocal() {
void NetworkServer::FlushLocal() {
if (auto async = m_flushLocalAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
void NetworkServer::Flush() {
if (auto async = m_flushAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
void NetworkServer::HandleLocal() {
m_localQueue.ReadQueue(&m_localMsgs);
m_serverImpl.HandleLocal(m_localMsgs);
}
void NSImpl::LoadPersistent() {
void NetworkServer::LoadPersistent() {
std::error_code ec;
auto size = fs::file_size(m_persistentFilename, ec);
wpi::raw_fd_istream is{m_persistentFilename, ec};
@@ -376,12 +328,13 @@ void NSImpl::LoadPersistent() {
is.readinto(m_persistentData, size);
DEBUG4("read data: {}", m_persistentData);
if (is.has_error()) {
WARNING("error reading persistent file");
WARN("error reading persistent file");
return;
}
}
void NSImpl::SavePersistent(std::string_view filename, std::string_view data) {
void NetworkServer::SavePersistent(std::string_view filename,
std::string_view data) {
// write to temporary file
auto tmp = fmt::format("{}.tmp", filename);
std::error_code ec;
@@ -409,13 +362,13 @@ void NSImpl::SavePersistent(std::string_view filename, std::string_view data) {
}
}
void NSImpl::Init() {
void NetworkServer::Init() {
if (m_shutdown) {
return;
}
auto errs = m_serverImpl.LoadPersistent(m_persistentData);
if (!errs.empty()) {
WARNING("error reading persistent file: {}", errs);
WARN("error reading persistent file: {}", errs);
}
// set up timers
@@ -535,13 +488,14 @@ void NSImpl::Init() {
}
}
void NSImpl::AddConnection(ServerConnection* conn, const ConnectionInfo& info) {
void NetworkServer::AddConnection(ServerConnection* conn,
const ConnectionInfo& info) {
std::scoped_lock lock{m_mutex};
m_connections.emplace_back(Connection{conn, m_connList.AddConnection(info)});
m_serverImpl.ConnectionsChanged(m_connList.GetConnections());
}
void NSImpl::RemoveConnection(ServerConnection* conn) {
void NetworkServer::RemoveConnection(ServerConnection* conn) {
std::scoped_lock lock{m_mutex};
auto it = std::find_if(m_connections.begin(), m_connections.end(),
[=](auto&& c) { return c.conn == conn; });
@@ -551,40 +505,3 @@ void NSImpl::RemoveConnection(ServerConnection* conn) {
m_serverImpl.ConnectionsChanged(m_connList.GetConnections());
}
}
class NetworkServer::Impl final : public NSImpl {
public:
Impl(std::string_view persistFilename, std::string_view listenAddress,
unsigned int port3, unsigned int port4, net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger,
std::function<void()> initDone)
: NSImpl{persistFilename, listenAddress, port3, port4,
localStorage, connList, logger, std::move(initDone)} {}
};
NetworkServer::NetworkServer(std::string_view persistFilename,
std::string_view listenAddress, unsigned int port3,
unsigned int port4,
net::ILocalStorage& localStorage,
IConnectionList& connList, wpi::Logger& logger,
std::function<void()> initDone)
: m_impl{std::make_unique<Impl>(persistFilename, listenAddress, port3,
port4, localStorage, connList, logger,
std::move(initDone))} {}
NetworkServer::~NetworkServer() {
m_impl->m_localStorage.ClearNetwork();
m_impl->m_connList.ClearConnections();
}
void NetworkServer::FlushLocal() {
if (auto async = m_impl->m_flushLocalAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}
void NetworkServer::Flush() {
if (auto async = m_impl->m_flushAtomic.load(std::memory_order_relaxed)) {
async->UnsafeSend();
}
}

View File

@@ -4,10 +4,20 @@
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
#include <wpinet/EventLoopRunner.h>
#include <wpinet/uv/Async.h>
#include <wpinet/uv/Timer.h>
#include "net/Message.h"
#include "net/NetworkLoopQueue.h"
#include "net/ServerImpl.h"
#include "ntcore_cpp.h"
namespace wpi {
@@ -35,8 +45,52 @@ class NetworkServer {
void Flush();
private:
class Impl;
std::unique_ptr<Impl> m_impl;
class ServerConnection;
class ServerConnection3;
class ServerConnection4;
void HandleLocal();
void LoadPersistent();
void SavePersistent(std::string_view filename, std::string_view data);
void Init();
void AddConnection(ServerConnection* conn, const ConnectionInfo& info);
void RemoveConnection(ServerConnection* conn);
net::ILocalStorage& m_localStorage;
IConnectionList& m_connList;
wpi::Logger& m_logger;
std::function<void()> m_initDone;
std::string m_persistentData;
std::string m_persistentFilename;
std::string m_listenAddress;
unsigned int m_port3;
unsigned int m_port4;
// used only from loop
std::shared_ptr<wpi::uv::Timer> m_readLocalTimer;
std::shared_ptr<wpi::uv::Timer> m_savePersistentTimer;
std::shared_ptr<wpi::uv::Async<>> m_flushLocal;
std::shared_ptr<wpi::uv::Async<>> m_flush;
bool m_shutdown = false;
std::vector<net::ClientMessage> m_localMsgs;
net::ServerImpl m_serverImpl;
// shared with user (must be atomic or mutex-protected)
std::atomic<wpi::uv::Async<>*> m_flushLocalAtomic{nullptr};
std::atomic<wpi::uv::Async<>*> m_flushAtomic{nullptr};
mutable wpi::mutex m_mutex;
struct Connection {
ServerConnection* conn;
int connHandle;
};
std::vector<Connection> m_connections;
net::NetworkLoopQueue m_localQueue;
wpi::EventLoopRunner m_loopRunner;
wpi::uv::Loop& m_loop;
};
} // namespace nt

View File

@@ -0,0 +1,17 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ValueCircularBuffer.h"
using namespace nt;
std::vector<Value> ValueCircularBuffer::ReadValue() {
std::vector<Value> rv;
rv.reserve(m_storage.size());
for (auto&& val : m_storage) {
rv.emplace_back(std::move(val));
}
m_storage.reset();
return rv;
}

View File

@@ -0,0 +1,49 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <utility>
#include <vector>
#include <wpi/circular_buffer.h>
#include "Value_internal.h"
#include "networktables/NetworkTableValue.h"
#include "ntcore_cpp_types.h"
namespace nt {
class ValueCircularBuffer {
public:
explicit ValueCircularBuffer(size_t size) : m_storage{size} {}
template <class... Args>
void emplace_back(Args&&... args) {
m_storage.emplace_back(std::forward<Args...>(args...));
}
std::vector<Value> ReadValue();
template <ValidType T>
std::vector<Timestamped<typename TypeInfo<T>::Value>> Read();
private:
wpi::circular_buffer<Value> m_storage;
};
template <ValidType T>
std::vector<Timestamped<typename TypeInfo<T>::Value>>
ValueCircularBuffer::Read() {
std::vector<Timestamped<typename TypeInfo<T>::Value>> rv;
rv.reserve(m_storage.size());
for (auto&& val : m_storage) {
if (IsNumericConvertibleTo<T>(val) || IsType<T>(val)) {
rv.emplace_back(GetTimestamped<T, true>(val));
}
}
m_storage.reset();
return rv;
}
} // namespace nt

View File

@@ -26,19 +26,19 @@ Value nt::ConvertNumericValue(const Value& value, NT_Type type) {
return newval;
}
case NT_INTEGER_ARRAY: {
Value newval = Value::MakeIntegerArray(GetNumericArrayAs<int64_t>(value),
value.time());
Value newval = Value::MakeIntegerArray(
GetNumericArrayAs<int64_t[]>(value), value.time());
newval.SetServerTime(value.server_time());
return newval;
}
case NT_FLOAT_ARRAY: {
Value newval =
Value::MakeFloatArray(GetNumericArrayAs<float>(value), value.time());
Value newval = Value::MakeFloatArray(GetNumericArrayAs<float[]>(value),
value.time());
newval.SetServerTime(value.server_time());
return newval;
}
case NT_DOUBLE_ARRAY: {
Value newval = Value::MakeDoubleArray(GetNumericArrayAs<double>(value),
Value newval = Value::MakeDoubleArray(GetNumericArrayAs<double[]>(value),
value.time());
newval.SetServerTime(value.server_time());
return newval;

View File

@@ -4,20 +4,413 @@
#pragma once
#include <concepts>
#include <cstring>
#include <memory>
#include <span>
#include <string>
#include <string_view>
#include <type_traits>
#include <vector>
#include <wpi/MemAlloc.h>
#include "networktables/NetworkTableValue.h"
#include "ntcore_c.h"
#include "ntcore_cpp_types.h"
namespace nt {
class Value;
template <typename T>
struct TypeInfo {};
template <>
struct TypeInfo<bool> {
static constexpr NT_Type kType = NT_BOOLEAN;
using Value = bool;
using View = bool;
};
template <>
struct TypeInfo<int64_t> {
static constexpr NT_Type kType = NT_INTEGER;
using Value = int64_t;
using View = int64_t;
};
template <>
struct TypeInfo<float> {
static constexpr NT_Type kType = NT_FLOAT;
using Value = float;
using View = float;
};
template <>
struct TypeInfo<double> {
static constexpr NT_Type kType = NT_DOUBLE;
using Value = double;
using View = double;
};
template <>
struct TypeInfo<std::string> {
static constexpr NT_Type kType = NT_STRING;
using Value = std::string;
using View = std::string_view;
using SmallRet = std::string_view;
using SmallElem = char;
};
template <>
struct TypeInfo<std::string_view> : public TypeInfo<std::string> {};
template <>
struct TypeInfo<uint8_t[]> {
static constexpr NT_Type kType = NT_RAW;
using Value = std::vector<uint8_t>;
using View = std::span<const uint8_t>;
using SmallRet = std::span<uint8_t>;
using SmallElem = uint8_t;
};
template <>
struct TypeInfo<std::vector<uint8_t>> : public TypeInfo<uint8_t[]> {};
template <>
struct TypeInfo<std::span<const uint8_t>> : public TypeInfo<uint8_t[]> {};
template <>
struct TypeInfo<bool[]> {
static constexpr NT_Type kType = NT_BOOLEAN_ARRAY;
using ElementType = bool;
using Value = std::vector<int>;
using View = std::span<const int>;
using SmallRet = std::span<int>;
using SmallElem = int;
};
template <>
struct TypeInfo<int64_t[]> {
static constexpr NT_Type kType = NT_INTEGER_ARRAY;
using ElementType = int64_t;
using Value = std::vector<int64_t>;
using View = std::span<const int64_t>;
using SmallRet = std::span<int64_t>;
using SmallElem = int64_t;
};
template <>
struct TypeInfo<std::vector<int64_t>> : public TypeInfo<int64_t[]> {};
template <>
struct TypeInfo<std::span<const int64_t>> : public TypeInfo<int64_t[]> {};
template <>
struct TypeInfo<float[]> {
static constexpr NT_Type kType = NT_FLOAT_ARRAY;
using ElementType = float;
using Value = std::vector<float>;
using View = std::span<const float>;
using SmallRet = std::span<float>;
using SmallElem = float;
};
template <>
struct TypeInfo<std::vector<float>> : public TypeInfo<float[]> {};
template <>
struct TypeInfo<std::span<const float>> : public TypeInfo<float[]> {};
template <>
struct TypeInfo<double[]> {
static constexpr NT_Type kType = NT_DOUBLE_ARRAY;
using ElementType = double;
using Value = std::vector<double>;
using View = std::span<const double>;
using SmallRet = std::span<double>;
using SmallElem = double;
};
template <>
struct TypeInfo<std::vector<double>> : public TypeInfo<double[]> {};
template <>
struct TypeInfo<std::span<const double>> : public TypeInfo<double[]> {};
template <>
struct TypeInfo<std::string[]> {
static constexpr NT_Type kType = NT_STRING_ARRAY;
using ElementType = std::string;
using Value = std::vector<std::string>;
using View = std::span<const std::string>;
};
template <>
struct TypeInfo<std::vector<std::string>> : public TypeInfo<std::string[]> {};
template <>
struct TypeInfo<std::span<const std::string>> : public TypeInfo<std::string[]> {
};
template <typename T>
concept ValidType = requires {
{ TypeInfo<std::remove_cvref_t<T>>::kType } -> std::convertible_to<NT_Type>;
typename TypeInfo<std::remove_cvref_t<T>>::Value;
typename TypeInfo<std::remove_cvref_t<T>>::View;
};
static_assert(ValidType<bool>);
static_assert(!ValidType<uint8_t>);
static_assert(ValidType<uint8_t[]>);
static_assert(ValidType<std::vector<uint8_t>>);
template <ValidType T, NT_Type type>
constexpr bool IsNTType = TypeInfo<std::remove_cvref_t<T>>::kType == type;
static_assert(IsNTType<bool, NT_BOOLEAN>);
static_assert(!IsNTType<bool, NT_DOUBLE>);
template <typename T>
concept ArrayType =
requires { typename TypeInfo<std::remove_cvref_t<T>>::ElementType; };
static_assert(ArrayType<std::string[]>);
static_assert(!ArrayType<uint8_t[]>);
template <typename T>
concept SmallArrayType = requires {
typename TypeInfo<std::remove_cvref_t<T>>::SmallRet;
typename TypeInfo<std::remove_cvref_t<T>>::SmallElem;
};
static_assert(SmallArrayType<float[]>);
static_assert(!SmallArrayType<std::string[]>);
template <typename T>
concept NumericType =
IsNTType<T, NT_INTEGER> || IsNTType<T, NT_FLOAT> || IsNTType<T, NT_DOUBLE>;
static_assert(NumericType<int64_t>);
static_assert(NumericType<float>);
static_assert(NumericType<double>);
static_assert(!NumericType<bool>);
static_assert(!NumericType<std::string>);
static_assert(!NumericType<int64_t[]>);
static_assert(!NumericType<double[]>);
static_assert(!NumericType<bool[]>);
static_assert(!NumericType<uint8_t[]>);
template <typename T>
concept NumericArrayType =
ArrayType<T> &&
NumericType<typename TypeInfo<std::remove_cvref_t<T>>::ElementType>;
static_assert(NumericArrayType<int64_t[]>);
static_assert(NumericArrayType<float[]>);
static_assert(NumericArrayType<double[]>);
static_assert(!NumericArrayType<bool[]>);
template <NumericType T>
inline typename TypeInfo<T>::Value GetNumericAs(const Value& value) {
if (value.IsInteger()) {
return static_cast<typename TypeInfo<T>::Value>(value.GetInteger());
} else if (value.IsFloat()) {
return static_cast<typename TypeInfo<T>::Value>(value.GetFloat());
} else if (value.IsDouble()) {
return static_cast<typename TypeInfo<T>::Value>(value.GetDouble());
} else {
return {};
}
}
template <NumericArrayType T>
typename TypeInfo<T>::Value GetNumericArrayAs(const Value& value) {
if (value.IsIntegerArray()) {
auto arr = value.GetIntegerArray();
return {arr.begin(), arr.end()};
} else if (value.IsFloatArray()) {
auto arr = value.GetFloatArray();
return {arr.begin(), arr.end()};
} else if (value.IsDoubleArray()) {
auto arr = value.GetDoubleArray();
return {arr.begin(), arr.end()};
} else {
return {};
}
}
template <ValidType T>
inline bool IsType(const Value& value) {
return value.type() == TypeInfo<T>::kType;
}
template <ValidType T>
inline bool IsNumericConvertibleTo(const Value& value) {
if constexpr (NumericType<T>) {
return value.IsInteger() || value.IsFloat() || value.IsDouble();
} else if constexpr (NumericArrayType<T>) {
return value.IsIntegerArray() || value.IsFloatArray() ||
value.IsDoubleArray();
} else {
return false;
}
}
template <ValidType T>
inline typename TypeInfo<T>::View GetValueView(const Value& value) {
if constexpr (IsNTType<T, NT_BOOLEAN>) {
return value.GetBoolean();
} else if constexpr (IsNTType<T, NT_INTEGER>) {
return value.GetInteger();
} else if constexpr (IsNTType<T, NT_FLOAT>) {
return value.GetFloat();
} else if constexpr (IsNTType<T, NT_DOUBLE>) {
return value.GetDouble();
} else if constexpr (IsNTType<T, NT_STRING>) {
return value.GetString();
} else if constexpr (IsNTType<T, NT_RAW>) {
return value.GetRaw();
} else if constexpr (IsNTType<T, NT_BOOLEAN_ARRAY>) {
return value.GetBooleanArray();
} else if constexpr (IsNTType<T, NT_INTEGER_ARRAY>) {
return value.GetIntegerArray();
} else if constexpr (IsNTType<T, NT_FLOAT_ARRAY>) {
return value.GetFloatArray();
} else if constexpr (IsNTType<T, NT_DOUBLE_ARRAY>) {
return value.GetDoubleArray();
} else if constexpr (IsNTType<T, NT_STRING_ARRAY>) {
return value.GetStringArray();
}
}
template <ValidType T>
inline Value MakeValue(typename TypeInfo<T>::View value, int64_t time) {
if constexpr (IsNTType<T, NT_BOOLEAN>) {
return Value::MakeBoolean(value, time);
} else if constexpr (IsNTType<T, NT_INTEGER>) {
return Value::MakeInteger(value, time);
} else if constexpr (IsNTType<T, NT_FLOAT>) {
return Value::MakeFloat(value, time);
} else if constexpr (IsNTType<T, NT_DOUBLE>) {
return Value::MakeDouble(value, time);
} else if constexpr (IsNTType<T, NT_STRING>) {
return Value::MakeString(value, time);
} else if constexpr (IsNTType<T, NT_RAW>) {
return Value::MakeRaw(value, time);
} else if constexpr (IsNTType<T, NT_BOOLEAN_ARRAY>) {
return Value::MakeBooleanArray(value, time);
} else if constexpr (IsNTType<T, NT_INTEGER_ARRAY>) {
return Value::MakeIntegerArray(value, time);
} else if constexpr (IsNTType<T, NT_FLOAT_ARRAY>) {
return Value::MakeFloatArray(value, time);
} else if constexpr (IsNTType<T, NT_DOUBLE_ARRAY>) {
return Value::MakeDoubleArray(value, time);
} else if constexpr (IsNTType<T, NT_STRING_ARRAY>) {
return Value::MakeStringArray(value, time);
}
}
template <ValidType T>
requires ArrayType<T> || IsNTType<T, NT_STRING> || IsNTType<T, NT_RAW>
inline Value MakeValue(typename TypeInfo<T>::Value&& value, int64_t time) {
if constexpr (IsNTType<T, NT_STRING>) {
return Value::MakeString(value, time);
} else if constexpr (IsNTType<T, NT_RAW>) {
return Value::MakeRaw(value, time);
} else if constexpr (IsNTType<T, NT_BOOLEAN_ARRAY>) {
return Value::MakeBooleanArray(value, time);
} else if constexpr (IsNTType<T, NT_INTEGER_ARRAY>) {
return Value::MakeIntegerArray(value, time);
} else if constexpr (IsNTType<T, NT_FLOAT_ARRAY>) {
return Value::MakeFloatArray(value, time);
} else if constexpr (IsNTType<T, NT_DOUBLE_ARRAY>) {
return Value::MakeDoubleArray(value, time);
} else if constexpr (IsNTType<T, NT_STRING_ARRAY>) {
return Value::MakeStringArray(value, time);
}
}
template <ValidType T>
inline typename TypeInfo<T>::Value CopyValue(typename TypeInfo<T>::View value) {
if constexpr (ArrayType<T> || IsNTType<T, NT_RAW>) {
return {value.begin(), value.end()};
} else if constexpr (IsNTType<T, NT_STRING>) {
return std::string{value};
} else {
return value;
}
}
template <SmallArrayType T>
inline typename TypeInfo<T>::SmallRet CopyValue(
typename TypeInfo<T>::View arr,
wpi::SmallVectorImpl<typename TypeInfo<T>::SmallElem>& buf) {
buf.assign(arr.begin(), arr.end());
return {buf.data(), buf.size()};
}
template <ValidType T, bool ConvertNumeric>
inline typename TypeInfo<T>::Value GetValueCopy(const Value& value) {
if constexpr (ConvertNumeric && NumericType<T>) {
return GetNumericAs<T>(value);
} else if constexpr (ConvertNumeric && NumericArrayType<T>) {
return GetNumericArrayAs<T>(value);
} else {
return CopyValue<T>(GetValueView<T>(value));
}
}
template <SmallArrayType T, bool ConvertNumeric>
inline typename TypeInfo<T>::SmallRet GetValueCopy(
const Value& value,
wpi::SmallVectorImpl<typename TypeInfo<T>::SmallElem>& buf) {
if constexpr (ConvertNumeric && NumericArrayType<T>) {
if (value.IsIntegerArray()) {
auto arr = value.GetIntegerArray();
buf.assign(arr.begin(), arr.end());
return {buf.data(), buf.size()};
} else if (value.IsFloatArray()) {
auto arr = value.GetFloatArray();
buf.assign(arr.begin(), arr.end());
return {buf.data(), buf.size()};
} else if (value.IsDoubleArray()) {
auto arr = value.GetDoubleArray();
buf.assign(arr.begin(), arr.end());
return {buf.data(), buf.size()};
} else {
return {};
}
} else {
return CopyValue<T>(GetValueView<T>(value), buf);
}
}
template <ValidType T, bool ConvertNumeric>
inline Timestamped<typename TypeInfo<T>::Value> GetTimestamped(
const Value& value) {
return {value.time(), value.server_time(),
GetValueCopy<T, ConvertNumeric>(value)};
}
template <SmallArrayType T, bool ConvertNumeric>
inline Timestamped<typename TypeInfo<T>::SmallRet> GetTimestamped(
const Value& value,
wpi::SmallVectorImpl<typename TypeInfo<T>::SmallElem>& buf) {
return {value.time(), value.server_time(),
GetValueCopy<T, ConvertNumeric>(value, buf)};
}
template <typename T>
inline void ConvertToC(const T& in, T* out) {
@@ -57,35 +450,6 @@ O* ConvertToC(const std::basic_string<I>& in, size_t* out_len) {
return out;
}
template <typename T>
T GetNumericAs(const Value& value) {
if (value.IsInteger()) {
return static_cast<T>(value.GetInteger());
} else if (value.IsFloat()) {
return static_cast<T>(value.GetFloat());
} else if (value.IsDouble()) {
return static_cast<T>(value.GetDouble());
} else {
return {};
}
}
template <typename T>
std::vector<T> GetNumericArrayAs(const Value& value) {
if (value.IsIntegerArray()) {
auto arr = value.GetIntegerArray();
return {arr.begin(), arr.end()};
} else if (value.IsFloatArray()) {
auto arr = value.GetFloatArray();
return {arr.begin(), arr.end()};
} else if (value.IsDoubleArray()) {
auto arr = value.GetDoubleArray();
return {arr.begin(), arr.end()};
} else {
return {};
}
}
Value ConvertNumericValue(const Value& value, NT_Type type);
} // namespace nt

View File

@@ -0,0 +1,21 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <vector>
namespace nt {
// Utility wrapper for making a set-like vector
template <typename T>
class VectorSet : public std::vector<T> {
public:
using iterator = typename std::vector<T>::iterator;
void Add(T value) { this->push_back(value); }
// returns true if element was present
bool Remove(T value) { return std::erase(*this, value) != 0; }
};
} // namespace nt

View File

@@ -10,7 +10,6 @@
#include <variant>
#include <fmt/format.h>
#include <wpi/DenseMap.h>
#include <wpi/Logger.h>
#include <wpi/raw_ostream.h>
#include <wpi/timestamp.h>
@@ -19,9 +18,7 @@
#include "Log.h"
#include "Message.h"
#include "NetworkInterface.h"
#include "PubSubOptions.h"
#include "WireConnection.h"
#include "WireDecoder.h"
#include "WireEncoder.h"
#include "networktables/NetworkTableValue.h"
@@ -34,79 +31,7 @@ static constexpr uint32_t kMinPeriodMs = 5;
// transmission before we close the connection
static constexpr uint32_t kWireMaxNotReadyUs = 1000000;
namespace {
struct PublisherData {
NT_Publisher handle;
PubSubOptionsImpl options;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
uint32_t periodMs;
uint64_t nextSendMs{0};
std::vector<Value> outValues; // outgoing values
};
class CImpl : public ServerMessageHandler {
public:
CImpl(uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic);
void ProcessIncomingBinary(uint64_t curTimeMs, std::span<const uint8_t> data);
void HandleLocal(std::vector<ClientMessage>&& msgs);
bool SendControl(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs, bool flush);
void SendInitialValues();
bool CheckNetworkReady(uint64_t curTimeMs);
// ServerMessageHandler interface
void ServerAnnounce(std::string_view name, int64_t id,
std::string_view typeStr, const wpi::json& properties,
std::optional<int64_t> pubuid) final;
void ServerUnannounce(std::string_view name, int64_t id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
bool Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle);
void SetValue(NT_Publisher pubHandle, const Value& value);
int m_inst;
WireConnection& m_wire;
wpi::Logger& m_logger;
LocalInterface* m_local{nullptr};
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
m_timeSyncUpdated;
std::function<void(uint32_t repeatMs)> m_setPeriodic;
// indexed by publisher index
std::vector<std::unique_ptr<PublisherData>> m_publishers;
// indexed by server-provided topic id
wpi::DenseMap<int64_t, NT_Topic> m_topicMap;
// timestamp handling
static constexpr uint32_t kPingIntervalMs = 3000;
uint64_t m_nextPingTimeMs{0};
uint64_t m_pongTimeMs{0};
uint32_t m_rtt2Us{UINT32_MAX};
bool m_haveTimeOffset{false};
int64_t m_serverTimeOffsetUs{0};
// periodic sweep handling
uint32_t m_periodMs{kPingIntervalMs + 10};
uint64_t m_lastSendMs{0};
// outgoing queue
std::vector<ClientMessage> m_outgoing;
};
} // namespace
CImpl::CImpl(
ClientImpl::ClientImpl(
uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
@@ -126,8 +51,8 @@ CImpl::CImpl(
m_setPeriodic(m_periodMs);
}
void CImpl::ProcessIncomingBinary(uint64_t curTimeMs,
std::span<const uint8_t> data) {
void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
std::span<const uint8_t> data) {
for (;;) {
if (data.empty()) {
break;
@@ -138,7 +63,7 @@ void CImpl::ProcessIncomingBinary(uint64_t curTimeMs,
Value value;
std::string error;
if (!WireDecodeBinary(&data, &id, &value, &error, -m_serverTimeOffsetUs)) {
ERROR("binary decode error: {}", error);
ERR("binary decode error: {}", error);
break; // FIXME
}
DEBUG4("BinaryMessage({})", id);
@@ -146,8 +71,8 @@ void CImpl::ProcessIncomingBinary(uint64_t curTimeMs,
// handle RTT ping response
if (id == -1) {
if (!value.IsInteger()) {
WARNING("RTT ping response with non-integer type {}",
static_cast<int>(value.type()));
WARN("RTT ping response with non-integer type {}",
static_cast<int>(value.type()));
continue;
}
DEBUG4("RTT ping response time {} value {}", value.time(),
@@ -168,7 +93,7 @@ void CImpl::ProcessIncomingBinary(uint64_t curTimeMs,
// otherwise it's a value message, get the local topic handle for it
auto topicIt = m_topicMap.find(id);
if (topicIt == m_topicMap.end()) {
WARNING("received unknown id {}", id);
WARN("received unknown id {}", id);
continue;
}
@@ -179,7 +104,7 @@ void CImpl::ProcessIncomingBinary(uint64_t curTimeMs,
}
}
void CImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
void ClientImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
DEBUG4("HandleLocal()");
for (auto&& elem : msgs) {
// common case is value
@@ -200,7 +125,7 @@ void CImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
}
}
bool CImpl::SendControl(uint64_t curTimeMs) {
bool ClientImpl::DoSendControl(uint64_t curTimeMs) {
DEBUG4("SendControl({})", curTimeMs);
// rate limit sends
@@ -246,7 +171,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) {
return true;
}
void CImpl::SendValues(uint64_t curTimeMs, bool flush) {
void ClientImpl::DoSendValues(uint64_t curTimeMs, bool flush) {
DEBUG4("SendValues({})", curTimeMs);
// can't send value updates until we have a RTT
@@ -255,7 +180,7 @@ void CImpl::SendValues(uint64_t curTimeMs, bool flush) {
}
// ensure all control messages are sent ahead of value updates
if (!SendControl(curTimeMs)) {
if (!DoSendControl(curTimeMs)) {
return;
}
@@ -291,11 +216,11 @@ void CImpl::SendValues(uint64_t curTimeMs, bool flush) {
}
}
void CImpl::SendInitialValues() {
void ClientImpl::SendInitialValues() {
DEBUG4("SendInitialValues()");
// ensure all control messages are sent ahead of value updates
if (!SendControl(0)) {
if (!DoSendControl(0)) {
return;
}
@@ -321,7 +246,7 @@ void CImpl::SendInitialValues() {
}
}
bool CImpl::CheckNetworkReady(uint64_t curTimeMs) {
bool ClientImpl::CheckNetworkReady(uint64_t curTimeMs) {
if (!m_wire.Ready()) {
uint64_t lastFlushTime = m_wire.GetLastFlushTime();
uint64_t now = wpi::Now();
@@ -333,10 +258,10 @@ bool CImpl::CheckNetworkReady(uint64_t curTimeMs) {
return true;
}
void CImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
void ClientImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
m_publishers.resize(index + 1);
@@ -360,7 +285,7 @@ void CImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
m_setPeriodic(m_periodMs);
}
bool CImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
bool ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
return false;
@@ -400,7 +325,7 @@ bool CImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
return doSend;
}
void CImpl::SetValue(NT_Publisher pubHandle, const Value& value) {
void ClientImpl::SetValue(NT_Publisher pubHandle, const Value& value) {
DEBUG4("SetValue({}, time={}, server_time={}, st_off={})", pubHandle,
value.time(), value.server_time(), m_serverTimeOffsetUs);
unsigned int index = Handle{pubHandle}.GetIndex();
@@ -415,10 +340,10 @@ void CImpl::SetValue(NT_Publisher pubHandle, const Value& value) {
}
}
void CImpl::ServerAnnounce(std::string_view name, int64_t id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubuid) {
void ClientImpl::ServerAnnounce(std::string_view name, int64_t id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubuid) {
DEBUG4("ServerAnnounce({}, {}, {})", name, id, typeStr);
assert(m_local);
NT_Publisher pubHandle{0};
@@ -429,76 +354,38 @@ void CImpl::ServerAnnounce(std::string_view name, int64_t id,
m_local->NetworkAnnounce(name, typeStr, properties, pubHandle);
}
void CImpl::ServerUnannounce(std::string_view name, int64_t id) {
void ClientImpl::ServerUnannounce(std::string_view name, int64_t id) {
DEBUG4("ServerUnannounce({}, {})", name, id);
assert(m_local);
m_local->NetworkUnannounce(name);
m_topicMap.erase(id);
}
void CImpl::ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
void ClientImpl::ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
DEBUG4("ServerProperties({}, {}, {})", name, update.dump(), ack);
assert(m_local);
m_local->NetworkPropertiesUpdate(name, update, ack);
}
class ClientImpl::Impl final : public CImpl {
public:
Impl(uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic)
: CImpl{curTimeMs,
inst,
wire,
logger,
std::move(timeSyncUpdated),
std::move(setPeriodic)} {}
};
ClientImpl::ClientImpl(
uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic)
: m_impl{std::make_unique<Impl>(curTimeMs, inst, wire, logger,
std::move(timeSyncUpdated),
std::move(setPeriodic))} {}
ClientImpl::~ClientImpl() = default;
void ClientImpl::ProcessIncomingText(std::string_view data) {
if (!m_impl->m_local) {
if (!m_local) {
return;
}
WireDecodeText(data, *m_impl, m_impl->m_logger);
}
void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
std::span<const uint8_t> data) {
m_impl->ProcessIncomingBinary(curTimeMs, data);
}
void ClientImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
m_impl->HandleLocal(std::move(msgs));
WireDecodeText(data, *this, m_logger);
}
void ClientImpl::SendControl(uint64_t curTimeMs) {
m_impl->SendControl(curTimeMs);
m_impl->m_wire.Flush();
DoSendControl(curTimeMs);
m_wire.Flush();
}
void ClientImpl::SendValues(uint64_t curTimeMs, bool flush) {
m_impl->SendValues(curTimeMs, flush);
m_impl->m_wire.Flush();
}
void ClientImpl::SetLocal(LocalInterface* local) {
m_impl->m_local = local;
DoSendValues(curTimeMs, flush);
m_wire.Flush();
}
void ClientImpl::SendInitial() {
m_impl->SendInitialValues();
m_impl->m_wire.Flush();
SendInitialValues();
m_wire.Flush();
}

View File

@@ -13,8 +13,12 @@
#include <string_view>
#include <vector>
#include <wpi/DenseMap.h>
#include "NetworkInterface.h"
#include "PubSubOptions.h"
#include "WireConnection.h"
#include "WireDecoder.h"
namespace wpi {
class Logger;
@@ -30,14 +34,13 @@ namespace nt::net {
struct ClientMessage;
class WireConnection;
class ClientImpl {
class ClientImpl final : private ServerMessageHandler {
public:
ClientImpl(
uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic);
~ClientImpl();
void ProcessIncomingText(std::string_view data);
void ProcessIncomingBinary(uint64_t curTimeMs, std::span<const uint8_t> data);
@@ -46,12 +49,67 @@ class ClientImpl {
void SendControl(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs, bool flush);
void SetLocal(LocalInterface* local);
void SetLocal(LocalInterface* local) { m_local = local; }
void SendInitial();
private:
class Impl;
std::unique_ptr<Impl> m_impl;
struct PublisherData {
NT_Publisher handle;
PubSubOptionsImpl options;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
uint32_t periodMs;
uint64_t nextSendMs{0};
std::vector<Value> outValues; // outgoing values
};
bool DoSendControl(uint64_t curTimeMs);
void DoSendValues(uint64_t curTimeMs, bool flush);
void SendInitialValues();
bool CheckNetworkReady(uint64_t curTimeMs);
// ServerMessageHandler interface
void ServerAnnounce(std::string_view name, int64_t id,
std::string_view typeStr, const wpi::json& properties,
std::optional<int64_t> pubuid) final;
void ServerUnannounce(std::string_view name, int64_t id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
bool Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle);
void SetValue(NT_Publisher pubHandle, const Value& value);
int m_inst;
WireConnection& m_wire;
wpi::Logger& m_logger;
LocalInterface* m_local{nullptr};
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
m_timeSyncUpdated;
std::function<void(uint32_t repeatMs)> m_setPeriodic;
// indexed by publisher index
std::vector<std::unique_ptr<PublisherData>> m_publishers;
// indexed by server-provided topic id
wpi::DenseMap<int64_t, NT_Topic> m_topicMap;
// timestamp handling
static constexpr uint32_t kPingIntervalMs = 3000;
uint64_t m_nextPingTimeMs{0};
uint64_t m_pongTimeMs{0};
uint32_t m_rtt2Us{UINT32_MAX};
bool m_haveTimeOffset{false};
int64_t m_serverTimeOffsetUs{0};
// periodic sweep handling
uint32_t m_periodMs{kPingIntervalMs + 10};
uint64_t m_lastSendMs{0};
// outgoing queue
std::vector<ClientMessage> m_outgoing;
};
} // namespace nt::net

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@
#include <stdint.h>
#include <cmath>
#include <functional>
#include <memory>
#include <span>
@@ -14,11 +15,28 @@
#include <utility>
#include <vector>
#include <wpi/DenseMap.h>
#include <wpi/StringMap.h>
#include <wpi/UidVector.h>
#include <wpi/json.h>
#include "Message.h"
#include "NetworkInterface.h"
#include "PubSubOptions.h"
#include "VectorSet.h"
#include "WireConnection.h"
#include "WireDecoder.h"
#include "WireEncoder.h"
#include "net3/Message3.h"
#include "net3/SequenceNumber.h"
#include "net3/WireConnection3.h"
#include "net3/WireDecoder3.h"
namespace wpi {
class Logger;
template <typename T>
class SmallVectorImpl;
class raw_ostream;
} // namespace wpi
namespace nt::net3 {
@@ -38,7 +56,6 @@ class ServerImpl final {
std::function<void(std::string_view name, uint16_t proto)>;
explicit ServerImpl(wpi::Logger& logger);
~ServerImpl();
void SendControl(uint64_t curTimeMs);
void SendValues(int clientId, uint64_t curTimeMs);
@@ -69,8 +86,357 @@ class ServerImpl final {
std::string LoadPersistent(std::string_view in);
private:
class Impl;
std::unique_ptr<Impl> m_impl;
static constexpr uint32_t kMinPeriodMs = 5;
struct PublisherData;
struct SubscriberData;
struct TopicData;
class ClientData {
public:
ClientData(std::string_view name, std::string_view connInfo, bool local,
ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server,
int id, wpi::Logger& logger)
: m_name{name},
m_connInfo{connInfo},
m_local{local},
m_setPeriodic{std::move(setPeriodic)},
m_server{server},
m_id{id},
m_logger{logger} {}
virtual ~ClientData() = default;
virtual void ProcessIncomingText(std::string_view data) = 0;
virtual void ProcessIncomingBinary(std::span<const uint8_t> data) = 0;
enum SendMode { kSendDisabled = 0, kSendAll, kSendNormal, kSendImmNoFlush };
virtual void SendValue(TopicData* topic, const Value& value,
SendMode mode) = 0;
virtual void SendAnnounce(TopicData* topic,
std::optional<int64_t> pubuid) = 0;
virtual void SendUnannounce(TopicData* topic) = 0;
virtual void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) = 0;
virtual void SendOutgoing(uint64_t curTimeMs) = 0;
virtual void Flush() = 0;
void UpdateMetaClientPub();
void UpdateMetaClientSub();
std::span<SubscriberData*> GetSubscribers(
std::string_view name, bool special,
wpi::SmallVectorImpl<SubscriberData*>& buf);
std::string_view GetName() const { return m_name; }
int GetId() const { return m_id; }
protected:
std::string m_name;
std::string m_connInfo;
bool m_local; // local to machine
ServerImpl::SetPeriodicFunc m_setPeriodic;
// TODO: make this per-topic?
uint32_t m_periodMs{UINT32_MAX};
uint64_t m_lastSendMs{0};
ServerImpl& m_server;
int m_id;
wpi::Logger& m_logger;
wpi::DenseMap<int64_t, std::unique_ptr<PublisherData>> m_publishers;
wpi::DenseMap<int64_t, std::unique_ptr<SubscriberData>> m_subscribers;
public:
// meta topics
TopicData* m_metaPub = nullptr;
TopicData* m_metaSub = nullptr;
};
class ClientData4Base : public ClientData, protected ClientMessageHandler {
public:
ClientData4Base(std::string_view name, std::string_view connInfo,
bool local, ServerImpl::SetPeriodicFunc setPeriodic,
ServerImpl& server, int id, wpi::Logger& logger)
: ClientData{name, connInfo, local, setPeriodic, server, id, logger} {}
protected:
// ClientMessageHandler interface
void ClientPublish(int64_t pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties) final;
void ClientUnpublish(int64_t pubuid) final;
void ClientSetProperties(std::string_view name,
const wpi::json& update) final;
void ClientSubscribe(int64_t subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void ClientUnsubscribe(int64_t subuid) final;
void ClientSetValue(int64_t pubuid, const Value& value);
wpi::DenseMap<TopicData*, bool> m_announceSent;
};
class ClientDataLocal final : public ClientData4Base {
public:
ClientDataLocal(ServerImpl& server, int id, wpi::Logger& logger)
: ClientData4Base{"", "", true, [](uint32_t) {}, server, id, logger} {}
void ProcessIncomingText(std::string_view data) final {}
void ProcessIncomingBinary(std::span<const uint8_t> data) final {}
void SendValue(TopicData* topic, const Value& value, SendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int64_t> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs) final {}
void Flush() final {}
void HandleLocal(std::span<const ClientMessage> msgs);
};
class ClientData4 final : public ClientData4Base {
public:
ClientData4(std::string_view name, std::string_view connInfo, bool local,
WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic,
ServerImpl& server, int id, wpi::Logger& logger)
: ClientData4Base{name, connInfo, local, setPeriodic,
server, id, logger},
m_wire{wire} {}
void ProcessIncomingText(std::string_view data) final;
void ProcessIncomingBinary(std::span<const uint8_t> data) final;
void SendValue(TopicData* topic, const Value& value, SendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int64_t> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs) final;
void Flush() final;
public:
WireConnection& m_wire;
private:
std::vector<ServerMessage> m_outgoing;
wpi::DenseMap<NT_Topic, size_t> m_outgoingValueMap;
bool WriteBinary(int64_t id, int64_t time, const Value& value) {
return WireEncodeBinary(SendBinary().Add(), id, time, value);
}
TextWriter& SendText() {
m_outBinary.reset(); // ensure proper interleaving of text and binary
if (!m_outText) {
m_outText = m_wire.SendText();
}
return *m_outText;
}
BinaryWriter& SendBinary() {
m_outText.reset(); // ensure proper interleaving of text and binary
if (!m_outBinary) {
m_outBinary = m_wire.SendBinary();
}
return *m_outBinary;
}
// valid when we are actively writing to this client
std::optional<TextWriter> m_outText;
std::optional<BinaryWriter> m_outBinary;
};
class ClientData3 final : public ClientData, private net3::MessageHandler3 {
public:
ClientData3(std::string_view connInfo, bool local,
net3::WireConnection3& wire,
ServerImpl::Connected3Func connected,
ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server,
int id, wpi::Logger& logger)
: ClientData{"", connInfo, local, setPeriodic, server, id, logger},
m_connected{std::move(connected)},
m_wire{wire},
m_decoder{*this} {}
void ProcessIncomingText(std::string_view data) final {}
void ProcessIncomingBinary(std::span<const uint8_t> data) final;
void SendValue(TopicData* topic, const Value& value, SendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int64_t> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs) final;
void Flush() final { m_wire.Flush(); }
private:
// MessageHandler3 interface
void KeepAlive() final;
void ServerHelloDone() final;
void ClientHelloDone() final;
void ClearEntries() final;
void ProtoUnsup(unsigned int proto_rev) final;
void ClientHello(std::string_view self_id, unsigned int proto_rev) final;
void ServerHello(unsigned int flags, std::string_view self_id) final;
void EntryAssign(std::string_view name, unsigned int id,
unsigned int seq_num, const Value& value,
unsigned int flags) final;
void EntryUpdate(unsigned int id, unsigned int seq_num,
const Value& value) final;
void FlagsUpdate(unsigned int id, unsigned int flags) final;
void EntryDelete(unsigned int id) final;
void ExecuteRpc(unsigned int id, unsigned int uid,
std::span<const uint8_t> params) final {}
void RpcResponse(unsigned int id, unsigned int uid,
std::span<const uint8_t> result) final {}
ServerImpl::Connected3Func m_connected;
net3::WireConnection3& m_wire;
enum State { kStateInitial, kStateServerHelloComplete, kStateRunning };
State m_state{kStateInitial};
net3::WireDecoder3 m_decoder;
std::vector<net3::Message3> m_outgoing;
wpi::DenseMap<NT_Topic, size_t> m_outgoingValueMap;
int64_t m_nextPubUid{1};
struct TopicData3 {
explicit TopicData3(TopicData* topic) { UpdateFlags(topic); }
unsigned int flags{0};
net3::SequenceNumber seqNum;
bool sentAssign{false};
bool published{false};
int64_t pubuid{0};
bool UpdateFlags(TopicData* topic);
};
wpi::DenseMap<TopicData*, TopicData3> m_topics3;
TopicData3* GetTopic3(TopicData* topic) {
return &m_topics3.try_emplace(topic, topic).first->second;
}
};
struct TopicData {
TopicData(std::string_view name, std::string_view typeStr)
: name{name}, typeStr{typeStr} {}
TopicData(std::string_view name, std::string_view typeStr,
wpi::json properties)
: name{name}, typeStr{typeStr}, properties(std::move(properties)) {
RefreshProperties();
}
bool IsPublished() const {
return persistent || retained || !publishers.empty();
}
// returns true if properties changed
bool SetProperties(const wpi::json& update);
void RefreshProperties();
bool SetFlags(unsigned int flags_);
std::string name;
unsigned int id;
Value lastValue;
ClientData* lastValueClient = nullptr;
std::string typeStr;
wpi::json properties = wpi::json::object();
bool persistent{false};
bool retained{false};
bool special{false};
NT_Topic localHandle{0};
VectorSet<PublisherData*> publishers;
VectorSet<SubscriberData*> subscribers;
// meta topics
TopicData* metaPub = nullptr;
TopicData* metaSub = nullptr;
};
struct PublisherData {
PublisherData(ClientData* client, TopicData* topic, int64_t pubuid)
: client{client}, topic{topic}, pubuid{pubuid} {}
ClientData* client;
TopicData* topic;
int64_t pubuid;
};
struct SubscriberData {
SubscriberData(ClientData* client, std::span<const std::string> topicNames,
int64_t subuid, const PubSubOptionsImpl& options)
: client{client},
topicNames{topicNames.begin(), topicNames.end()},
subuid{subuid},
options{options},
periodMs(std::lround(options.periodicMs / 10.0) * 10) {
if (periodMs < kMinPeriodMs) {
periodMs = kMinPeriodMs;
}
}
void Update(std::span<const std::string> topicNames_,
const PubSubOptionsImpl& options_) {
topicNames = {topicNames_.begin(), topicNames_.end()};
options = options_;
periodMs = std::lround(options_.periodicMs / 10.0) * 10;
if (periodMs < kMinPeriodMs) {
periodMs = kMinPeriodMs;
}
}
bool Matches(std::string_view name, bool special);
ClientData* client;
std::vector<std::string> topicNames;
int64_t subuid;
PubSubOptionsImpl options;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
uint32_t periodMs;
};
wpi::Logger& m_logger;
LocalInterface* m_local{nullptr};
bool m_controlReady{false};
ClientDataLocal* m_localClient;
std::vector<std::unique_ptr<ClientData>> m_clients;
wpi::UidVector<std::unique_ptr<TopicData>, 16> m_topics;
wpi::StringMap<TopicData*> m_nameTopics;
bool m_persistentChanged{false};
// global meta topics (other meta topics are linked to from the specific
// client or topic)
TopicData* m_metaClients;
void DumpPersistent(wpi::raw_ostream& os);
// helper functions
TopicData* CreateTopic(ClientData* client, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
bool special = false);
TopicData* CreateMetaTopic(std::string_view name);
void DeleteTopic(TopicData* topic);
void SetProperties(ClientData* client, TopicData* topic,
const wpi::json& update);
void SetFlags(ClientData* client, TopicData* topic, unsigned int flags);
void SetValue(ClientData* client, TopicData* topic, const Value& value);
// update meta topic values from data structures
void UpdateMetaClients(const std::vector<ConnectionInfo>& conns);
void UpdateMetaTopicPub(TopicData* topic);
void UpdateMetaTopicSub(TopicData* topic);
void PropertiesChanged(ClientData* client, TopicData* topic,
const wpi::json& update);
};
} // namespace nt::net

View File

@@ -12,7 +12,6 @@
#include <fmt/format.h>
#include <wpi/DenseMap.h>
#include <wpi/StringMap.h>
#include <wpi/json.h>
#include <wpi/timestamp.h>
#include "Handle.h"
@@ -20,10 +19,6 @@
#include "Types_internal.h"
#include "net/Message.h"
#include "net/NetworkInterface.h"
#include "net3/Message3.h"
#include "net3/SequenceNumber.h"
#include "net3/WireConnection3.h"
#include "net3/WireDecoder3.h"
#include "net3/WireEncoder3.h"
#include "networktables/NetworkTableValue.h"
@@ -36,145 +31,7 @@ static constexpr uint32_t kMinPeriodMs = 5;
// transmission before we close the connection
static constexpr uint32_t kWireMaxNotReadyUs = 1000000;
namespace {
struct Entry;
struct PublisherData {
explicit PublisherData(Entry* entry) : entry{entry} {}
Entry* entry;
NT_Publisher handle;
PubSubOptionsImpl options;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
uint32_t periodMs;
uint64_t nextSendMs{0};
std::vector<Value> outValues; // outgoing values
};
// data for each entry
struct Entry {
explicit Entry(std::string_view name_) : name(name_) {}
bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; }
wpi::json SetFlags(unsigned int flags_);
std::string name;
std::string typeStr;
NT_Type type{NT_UNASSIGNED};
wpi::json properties = wpi::json::object();
// The current value and flags
Value value;
unsigned int flags{0};
// Unique ID used in network messages; this is 0xffff until assigned
// by the server.
unsigned int id{0xffff};
// Sequence number for update resolution
SequenceNumber seqNum;
// Local topic handle
NT_Topic topic{0};
// Local publishers
std::vector<PublisherData*> publishers;
};
class CImpl : public MessageHandler3 {
public:
CImpl(uint64_t curTimeMs, int inst, WireConnection3& wire,
wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic);
void ProcessIncoming(std::span<const uint8_t> data);
void HandleLocal(std::span<const net::ClientMessage> msgs);
void SendPeriodic(uint64_t curTimeMs, bool initial, bool flush);
void SendValue(Writer& out, Entry* entry, const Value& value);
bool CheckNetworkReady(uint64_t curTimeMs);
// Outgoing handlers
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle);
void SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update);
void SetValue(NT_Publisher pubHandle, const Value& value);
// MessageHandler interface
void KeepAlive() final;
void ServerHelloDone() final;
void ClientHelloDone() final;
void ClearEntries() final;
void ProtoUnsup(unsigned int proto_rev) final;
void ClientHello(std::string_view self_id, unsigned int proto_rev) final;
void ServerHello(unsigned int flags, std::string_view self_id) final;
void EntryAssign(std::string_view name, unsigned int id, unsigned int seq_num,
const Value& value, unsigned int flags) final;
void EntryUpdate(unsigned int id, unsigned int seq_num,
const Value& value) final;
void FlagsUpdate(unsigned int id, unsigned int flags) final;
void EntryDelete(unsigned int id) final;
void ExecuteRpc(unsigned int id, unsigned int uid,
std::span<const uint8_t> params) final {}
void RpcResponse(unsigned int id, unsigned int uid,
std::span<const uint8_t> result) final {}
enum State {
kStateInitial,
kStateHelloSent,
kStateInitialAssignments,
kStateRunning
};
int m_inst;
WireConnection3& m_wire;
wpi::Logger& m_logger;
net::LocalInterface* m_local{nullptr};
std::function<void(uint32_t repeatMs)> m_setPeriodic;
uint64_t m_initTimeMs;
// periodic sweep handling
static constexpr uint32_t kKeepAliveIntervalMs = 1000;
uint32_t m_periodMs{kKeepAliveIntervalMs + 10};
uint64_t m_lastSendMs{0};
uint64_t m_nextKeepAliveTimeMs;
// indexed by publisher index
std::vector<std::unique_ptr<PublisherData>> m_publishers;
State m_state{kStateInitial};
WireDecoder3 m_decoder;
std::string m_remoteId;
std::function<void()> m_handshakeSucceeded;
std::vector<std::pair<unsigned int, unsigned int>> m_outgoingFlags;
using NameMap = wpi::StringMap<std::unique_ptr<Entry>>;
using IdMap = std::vector<Entry*>;
NameMap m_nameMap;
IdMap m_idMap;
Entry* GetOrNewEntry(std::string_view name) {
auto& entry = m_nameMap[name];
if (!entry) {
entry = std::make_unique<Entry>(name);
}
return entry.get();
}
Entry* LookupId(unsigned int id) {
return id < m_idMap.size() ? m_idMap[id] : nullptr;
}
};
} // namespace
wpi::json Entry::SetFlags(unsigned int flags_) {
wpi::json ClientImpl3::Entry::SetFlags(unsigned int flags_) {
bool wasPersistent = IsPersistent();
flags = flags_;
bool isPersistent = IsPersistent();
@@ -189,25 +46,28 @@ wpi::json Entry::SetFlags(unsigned int flags_) {
}
}
CImpl::CImpl(uint64_t curTimeMs, int inst, WireConnection3& wire,
wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic)
: m_inst{inst},
m_wire{wire},
ClientImpl3::ClientImpl3(uint64_t curTimeMs, int inst, WireConnection3& wire,
wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic)
: m_wire{wire},
m_logger{logger},
m_setPeriodic{std::move(setPeriodic)},
m_initTimeMs{curTimeMs},
m_nextKeepAliveTimeMs{curTimeMs + kKeepAliveIntervalMs},
m_decoder{*this} {}
void CImpl::ProcessIncoming(std::span<const uint8_t> data) {
ClientImpl3::~ClientImpl3() {
DEBUG4("NT3 ClientImpl destroyed");
}
void ClientImpl3::ProcessIncoming(std::span<const uint8_t> data) {
DEBUG4("received {} bytes", data.size());
if (!m_decoder.Execute(&data)) {
m_wire.Disconnect(m_decoder.GetError());
}
}
void CImpl::HandleLocal(std::span<const net::ClientMessage> msgs) {
void ClientImpl3::HandleLocal(std::span<const net::ClientMessage> msgs) {
for (const auto& elem : msgs) { // NOLINT
// common case is value
if (auto msg = std::get_if<net::ClientValueMsg>(&elem.contents)) {
@@ -223,7 +83,7 @@ void CImpl::HandleLocal(std::span<const net::ClientMessage> msgs) {
}
}
void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) {
void ClientImpl3::DoSendPeriodic(uint64_t curTimeMs, bool initial, bool flush) {
DEBUG4("SendPeriodic({})", curTimeMs);
// rate limit sends
@@ -283,7 +143,7 @@ void CImpl::SendPeriodic(uint64_t curTimeMs, bool initial, bool flush) {
m_lastSendMs = curTimeMs;
}
void CImpl::SendValue(Writer& out, Entry* entry, const Value& value) {
void ClientImpl3::SendValue(Writer& out, Entry* entry, const Value& value) {
DEBUG4("sending value for '{}', seqnum {}", entry->name,
entry->seqNum.value());
@@ -302,7 +162,7 @@ void CImpl::SendValue(Writer& out, Entry* entry, const Value& value) {
}
}
bool CImpl::CheckNetworkReady(uint64_t curTimeMs) {
bool ClientImpl3::CheckNetworkReady(uint64_t curTimeMs) {
if (!m_wire.Ready()) {
uint64_t lastFlushTime = m_wire.GetLastFlushTime();
uint64_t now = wpi::Now();
@@ -314,10 +174,10 @@ bool CImpl::CheckNetworkReady(uint64_t curTimeMs) {
return true;
}
void CImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
void ClientImpl3::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
DEBUG4("Publish('{}', '{}')", name, typeStr);
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
@@ -342,7 +202,7 @@ void CImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
m_setPeriodic(m_periodMs);
}
void CImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
void ClientImpl3::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
DEBUG4("Unpublish({}, {})", pubHandle, topicHandle);
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
@@ -365,8 +225,8 @@ void CImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
m_setPeriodic(m_periodMs);
}
void CImpl::SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update) {
void ClientImpl3::SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update) {
DEBUG4("SetProperties({}, {}, {})", topicHandle, name, update.dump());
auto entry = GetOrNewEntry(name);
bool updated = false;
@@ -388,7 +248,7 @@ void CImpl::SetProperties(NT_Topic topicHandle, std::string_view name,
}
}
void CImpl::SetValue(NT_Publisher pubHandle, const Value& value) {
void ClientImpl3::SetValue(NT_Publisher pubHandle, const Value& value) {
DEBUG4("SetValue({})", pubHandle);
unsigned int index = Handle{pubHandle}.GetIndex();
assert(index < m_publishers.size() && m_publishers[index]);
@@ -404,7 +264,7 @@ void CImpl::SetValue(NT_Publisher pubHandle, const Value& value) {
}
}
void CImpl::KeepAlive() {
void ClientImpl3::KeepAlive() {
DEBUG4("KeepAlive()");
if (m_state != kStateRunning && m_state != kStateInitialAssignments) {
m_decoder.SetError("received unexpected KeepAlive message");
@@ -413,7 +273,7 @@ void CImpl::KeepAlive() {
// ignore
}
void CImpl::ServerHelloDone() {
void ClientImpl3::ServerHelloDone() {
DEBUG4("ServerHelloDone()");
if (m_state != kStateInitialAssignments) {
m_decoder.SetError("received unexpected ServerHelloDone message");
@@ -421,28 +281,29 @@ void CImpl::ServerHelloDone() {
}
// send initial assignments
SendPeriodic(m_initTimeMs, true, true);
DoSendPeriodic(m_initTimeMs, true, true);
m_state = kStateRunning;
m_setPeriodic(m_periodMs);
}
void CImpl::ClientHelloDone() {
void ClientImpl3::ClientHelloDone() {
DEBUG4("ClientHelloDone()");
m_decoder.SetError("received unexpected ClientHelloDone message");
}
void CImpl::ProtoUnsup(unsigned int proto_rev) {
void ClientImpl3::ProtoUnsup(unsigned int proto_rev) {
DEBUG4("ProtoUnsup({})", proto_rev);
m_decoder.SetError(fmt::format("received ProtoUnsup(version={})", proto_rev));
}
void CImpl::ClientHello(std::string_view self_id, unsigned int proto_rev) {
void ClientImpl3::ClientHello(std::string_view self_id,
unsigned int proto_rev) {
DEBUG4("ClientHello({}, {})", self_id, proto_rev);
m_decoder.SetError("received unexpected ClientHello message");
}
void CImpl::ServerHello(unsigned int flags, std::string_view self_id) {
void ClientImpl3::ServerHello(unsigned int flags, std::string_view self_id) {
DEBUG4("ServerHello({}, {})", flags, self_id);
if (m_state != kStateHelloSent) {
m_decoder.SetError("received unexpected ServerHello message");
@@ -454,9 +315,9 @@ void CImpl::ServerHello(unsigned int flags, std::string_view self_id) {
m_handshakeSucceeded = nullptr; // no longer required
}
void CImpl::EntryAssign(std::string_view name, unsigned int id,
unsigned int seq_num, const Value& value,
unsigned int flags) {
void ClientImpl3::EntryAssign(std::string_view name, unsigned int id,
unsigned int seq_num, const Value& value,
unsigned int flags) {
DEBUG4("EntryAssign({}, {}, {}, value, {})", name, id, seq_num, flags);
if (m_state != kStateInitialAssignments && m_state != kStateRunning) {
m_decoder.SetError("received unexpected EntryAssign message");
@@ -513,8 +374,8 @@ void CImpl::EntryAssign(std::string_view name, unsigned int id,
}
}
void CImpl::EntryUpdate(unsigned int id, unsigned int seq_num,
const Value& value) {
void ClientImpl3::EntryUpdate(unsigned int id, unsigned int seq_num,
const Value& value) {
DEBUG4("EntryUpdate({}, {}, value)", id, seq_num);
if (m_state != kStateRunning) {
m_decoder.SetError("received EntryUpdate message before ServerHelloDone");
@@ -528,7 +389,7 @@ void CImpl::EntryUpdate(unsigned int id, unsigned int seq_num,
}
}
void CImpl::FlagsUpdate(unsigned int id, unsigned int flags) {
void ClientImpl3::FlagsUpdate(unsigned int id, unsigned int flags) {
DEBUG4("FlagsUpdate({}, {})", id, flags);
if (m_state != kStateRunning) {
m_decoder.SetError("received FlagsUpdate message before ServerHelloDone");
@@ -548,7 +409,7 @@ void CImpl::FlagsUpdate(unsigned int id, unsigned int flags) {
m_outgoingFlags.end());
}
void CImpl::EntryDelete(unsigned int id) {
void ClientImpl3::EntryDelete(unsigned int id) {
DEBUG4("EntryDelete({})", id);
if (m_state != kStateRunning) {
m_decoder.SetError("received EntryDelete message before ServerHelloDone");
@@ -573,7 +434,7 @@ void CImpl::EntryDelete(unsigned int id) {
m_outgoingFlags.end());
}
void CImpl::ClearEntries() {
void ClientImpl3::ClearEntries() {
DEBUG4("ClearEntries()");
if (m_state != kStateRunning) {
m_decoder.SetError("received ClearEntries message before ServerHelloDone");
@@ -597,47 +458,14 @@ void CImpl::ClearEntries() {
m_outgoingFlags.resize(0);
}
class ClientImpl3::Impl final : public CImpl {
public:
Impl(uint64_t curTimeMs, int inst, WireConnection3& wire, wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic)
: CImpl{curTimeMs, inst, wire, logger, std::move(setPeriodic)} {}
};
ClientImpl3::ClientImpl3(uint64_t curTimeMs, int inst, WireConnection3& wire,
wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic)
: m_impl{std::make_unique<Impl>(curTimeMs, inst, wire, logger,
std::move(setPeriodic))} {}
ClientImpl3::~ClientImpl3() {
WPI_DEBUG4(m_impl->m_logger, "NT3 ClientImpl destroyed");
}
void ClientImpl3::Start(std::string_view selfId,
std::function<void()> succeeded) {
if (m_impl->m_state != CImpl::kStateInitial) {
if (m_state != kStateInitial) {
return;
}
m_impl->m_handshakeSucceeded = std::move(succeeded);
auto writer = m_impl->m_wire.Send();
m_handshakeSucceeded = std::move(succeeded);
auto writer = m_wire.Send();
WireEncodeClientHello(writer.stream(), selfId, 0x0300);
m_impl->m_wire.Flush();
m_impl->m_state = CImpl::kStateHelloSent;
}
void ClientImpl3::ProcessIncoming(std::span<const uint8_t> data) {
m_impl->ProcessIncoming(data);
}
void ClientImpl3::HandleLocal(std::span<const net::ClientMessage> msgs) {
m_impl->HandleLocal(msgs);
}
void ClientImpl3::SendPeriodic(uint64_t curTimeMs, bool flush) {
m_impl->SendPeriodic(curTimeMs, false, flush);
}
void ClientImpl3::SetLocal(net::LocalInterface* local) {
m_impl->m_local = local;
m_wire.Flush();
m_state = kStateHelloSent;
}

View File

@@ -11,8 +11,17 @@
#include <span>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include <wpi/json.h>
#include "PubSubOptions.h"
#include "net/NetworkInterface.h"
#include "net3/Message3.h"
#include "net3/SequenceNumber.h"
#include "net3/WireConnection3.h"
#include "net3/WireDecoder3.h"
namespace wpi {
class Logger;
@@ -27,24 +36,147 @@ namespace nt::net3 {
class WireConnection3;
class ClientImpl3 {
class ClientImpl3 final : private MessageHandler3 {
public:
explicit ClientImpl3(uint64_t curTimeMs, int inst, WireConnection3& wire,
wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic);
~ClientImpl3();
~ClientImpl3() final;
void Start(std::string_view selfId, std::function<void()> succeeded);
void ProcessIncoming(std::span<const uint8_t> data);
void HandleLocal(std::span<const net::ClientMessage> msgs);
void SendPeriodic(uint64_t curTimeMs, bool flush);
void SendPeriodic(uint64_t curTimeMs, bool flush) {
DoSendPeriodic(curTimeMs, false, flush);
}
void SetLocal(net::LocalInterface* local);
void SetLocal(net::LocalInterface* local) { m_local = local; }
private:
class Impl;
std::unique_ptr<Impl> m_impl;
struct Entry;
struct PublisherData {
explicit PublisherData(Entry* entry) : entry{entry} {}
Entry* entry;
NT_Publisher handle;
PubSubOptionsImpl options;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
uint32_t periodMs;
uint64_t nextSendMs{0};
std::vector<Value> outValues; // outgoing values
};
// data for each entry
struct Entry {
explicit Entry(std::string_view name_) : name(name_) {}
bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; }
wpi::json SetFlags(unsigned int flags_);
std::string name;
std::string typeStr;
NT_Type type{NT_UNASSIGNED};
wpi::json properties = wpi::json::object();
// The current value and flags
Value value;
unsigned int flags{0};
// Unique ID used in network messages; this is 0xffff until assigned
// by the server.
unsigned int id{0xffff};
// Sequence number for update resolution
SequenceNumber seqNum;
// Local topic handle
NT_Topic topic{0};
// Local publishers
std::vector<PublisherData*> publishers;
};
void DoSendPeriodic(uint64_t curTimeMs, bool initial, bool flush);
void SendValue(Writer& out, Entry* entry, const Value& value);
bool CheckNetworkReady(uint64_t curTimeMs);
// Outgoing handlers
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle);
void SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update);
void SetValue(NT_Publisher pubHandle, const Value& value);
// MessageHandler interface
void KeepAlive() final;
void ServerHelloDone() final;
void ClientHelloDone() final;
void ClearEntries() final;
void ProtoUnsup(unsigned int proto_rev) final;
void ClientHello(std::string_view self_id, unsigned int proto_rev) final;
void ServerHello(unsigned int flags, std::string_view self_id) final;
void EntryAssign(std::string_view name, unsigned int id, unsigned int seq_num,
const Value& value, unsigned int flags) final;
void EntryUpdate(unsigned int id, unsigned int seq_num,
const Value& value) final;
void FlagsUpdate(unsigned int id, unsigned int flags) final;
void EntryDelete(unsigned int id) final;
void ExecuteRpc(unsigned int id, unsigned int uid,
std::span<const uint8_t> params) final {}
void RpcResponse(unsigned int id, unsigned int uid,
std::span<const uint8_t> result) final {}
enum State {
kStateInitial,
kStateHelloSent,
kStateInitialAssignments,
kStateRunning
};
WireConnection3& m_wire;
wpi::Logger& m_logger;
net::LocalInterface* m_local{nullptr};
std::function<void(uint32_t repeatMs)> m_setPeriodic;
uint64_t m_initTimeMs;
// periodic sweep handling
static constexpr uint32_t kKeepAliveIntervalMs = 1000;
uint32_t m_periodMs{kKeepAliveIntervalMs + 10};
uint64_t m_lastSendMs{0};
uint64_t m_nextKeepAliveTimeMs;
// indexed by publisher index
std::vector<std::unique_ptr<PublisherData>> m_publishers;
State m_state{kStateInitial};
WireDecoder3 m_decoder;
std::string m_remoteId;
std::function<void()> m_handshakeSucceeded;
std::vector<std::pair<unsigned int, unsigned int>> m_outgoingFlags;
using NameMap = wpi::StringMap<std::unique_ptr<Entry>>;
using IdMap = std::vector<Entry*>;
NameMap m_nameMap;
IdMap m_idMap;
Entry* GetOrNewEntry(std::string_view name) {
auto& entry = m_nameMap[name];
if (!entry) {
entry = std::make_unique<Entry>(name);
}
return entry.get();
}
Entry* LookupId(unsigned int id) {
return id < m_idMap.size() ? m_idMap[id] : nullptr;
}
};
} // namespace nt::net3

View File

@@ -5,147 +5,24 @@
#include "WireDecoder3.h"
#include <algorithm>
#include <optional>
#include <string>
#include <vector>
#include <fmt/format.h>
#include <wpi/MathExtras.h>
#include <wpi/SpanExtras.h>
#include <wpi/leb128.h>
#include "Message3.h"
using namespace nt;
using namespace nt::net3;
namespace {
class SimpleValueReader {
public:
std::optional<uint16_t> Read16(std::span<const uint8_t>* in);
std::optional<uint32_t> Read32(std::span<const uint8_t>* in);
std::optional<uint64_t> Read64(std::span<const uint8_t>* in);
std::optional<double> ReadDouble(std::span<const uint8_t>* in);
private:
uint64_t m_value = 0;
int m_count = 0;
};
struct StringReader {
void SetLen(uint64_t len_) {
len = len_;
buf.clear();
}
std::optional<uint64_t> len;
std::string buf;
};
struct RawReader {
void SetLen(uint64_t len_) {
len = len_;
buf.clear();
}
std::optional<uint64_t> len;
std::vector<uint8_t> buf;
};
struct ValueReader {
ValueReader() = default;
explicit ValueReader(NT_Type type_) : type{type_} {}
void SetSize(uint32_t size_) {
haveSize = true;
size = size_;
ints.clear();
doubles.clear();
strings.clear();
}
NT_Type type = NT_UNASSIGNED;
bool haveSize = false;
uint32_t size = 0;
std::vector<int> ints;
std::vector<double> doubles;
std::vector<std::string> strings;
};
struct WDImpl {
explicit WDImpl(MessageHandler3& out) : m_out{out} {}
MessageHandler3& m_out;
// primary (message) decode state
enum {
kStart,
kClientHello_1ProtoRev,
kClientHello_2Id,
kProtoUnsup_1ProtoRev,
kServerHello_1Flags,
kServerHello_2Id,
kEntryAssign_1Name,
kEntryAssign_2Type,
kEntryAssign_3Id,
kEntryAssign_4SeqNum,
kEntryAssign_5Flags,
kEntryAssign_6Value,
kEntryUpdate_1Id,
kEntryUpdate_2SeqNum,
kEntryUpdate_3Type,
kEntryUpdate_4Value,
kFlagsUpdate_1Id,
kFlagsUpdate_2Flags,
kEntryDelete_1Id,
kClearEntries_1Magic,
kExecuteRpc_1Id,
kExecuteRpc_2Uid,
kExecuteRpc_3Params,
kRpcResponse_1Id,
kRpcResponse_2Uid,
kRpcResponse_3Result,
kError
} m_state = kStart;
// detail decoders
wpi::Uleb128Reader m_ulebReader;
SimpleValueReader m_simpleReader;
StringReader m_stringReader;
RawReader m_rawReader;
ValueReader m_valueReader;
std::string m_error;
std::string m_str;
unsigned int m_id{0}; // also used for proto_rev
unsigned int m_flags{0};
unsigned int m_seq_num_uid{0};
void Execute(std::span<const uint8_t>* in);
std::nullopt_t EmitError(std::string_view msg) {
m_state = kError;
m_error = msg;
return std::nullopt;
}
std::optional<std::string> ReadString(std::span<const uint8_t>* in);
std::optional<std::vector<uint8_t>> ReadRaw(std::span<const uint8_t>* in);
std::optional<NT_Type> ReadType(std::span<const uint8_t>* in);
std::optional<Value> ReadValue(std::span<const uint8_t>* in);
};
} // namespace
static uint8_t Read8(std::span<const uint8_t>* in) {
uint8_t val = in->front();
*in = wpi::drop_front(*in);
return val;
}
std::optional<uint16_t> SimpleValueReader::Read16(
std::optional<uint16_t> WireDecoder3::SimpleValueReader::Read16(
std::span<const uint8_t>* in) {
while (!in->empty()) {
m_value <<= 8;
@@ -161,7 +38,7 @@ std::optional<uint16_t> SimpleValueReader::Read16(
return std::nullopt;
}
std::optional<uint32_t> SimpleValueReader::Read32(
std::optional<uint32_t> WireDecoder3::SimpleValueReader::Read32(
std::span<const uint8_t>* in) {
while (!in->empty()) {
m_value <<= 8;
@@ -177,7 +54,7 @@ std::optional<uint32_t> SimpleValueReader::Read32(
return std::nullopt;
}
std::optional<uint64_t> SimpleValueReader::Read64(
std::optional<uint64_t> WireDecoder3::SimpleValueReader::Read64(
std::span<const uint8_t>* in) {
while (!in->empty()) {
m_value <<= 8;
@@ -193,7 +70,7 @@ std::optional<uint64_t> SimpleValueReader::Read64(
return std::nullopt;
}
std::optional<double> SimpleValueReader::ReadDouble(
std::optional<double> WireDecoder3::SimpleValueReader::ReadDouble(
std::span<const uint8_t>* in) {
if (auto val = Read64(in)) {
return wpi::BitsToDouble(val.value());
@@ -202,7 +79,7 @@ std::optional<double> SimpleValueReader::ReadDouble(
}
}
void WDImpl::Execute(std::span<const uint8_t>* in) {
void WireDecoder3::DoExecute(std::span<const uint8_t>* in) {
while (!in->empty()) {
switch (m_state) {
case kStart: {
@@ -417,7 +294,8 @@ void WDImpl::Execute(std::span<const uint8_t>* in) {
}
}
std::optional<std::string> WDImpl::ReadString(std::span<const uint8_t>* in) {
std::optional<std::string> WireDecoder3::ReadString(
std::span<const uint8_t>* in) {
// string length
if (!m_stringReader.len) {
if (auto val = m_ulebReader.ReadOne(in)) {
@@ -442,7 +320,7 @@ std::optional<std::string> WDImpl::ReadString(std::span<const uint8_t>* in) {
return std::nullopt;
}
std::optional<std::vector<uint8_t>> WDImpl::ReadRaw(
std::optional<std::vector<uint8_t>> WireDecoder3::ReadRaw(
std::span<const uint8_t>* in) {
// string length
if (!m_rawReader.len) {
@@ -468,7 +346,7 @@ std::optional<std::vector<uint8_t>> WDImpl::ReadRaw(
return std::nullopt;
}
std::optional<NT_Type> WDImpl::ReadType(std::span<const uint8_t>* in) {
std::optional<NT_Type> WireDecoder3::ReadType(std::span<const uint8_t>* in) {
// Convert from byte value to enum
switch (Read8(in)) {
case Message3::kBoolean:
@@ -492,7 +370,7 @@ std::optional<NT_Type> WDImpl::ReadType(std::span<const uint8_t>* in) {
}
}
std::optional<Value> WDImpl::ReadValue(std::span<const uint8_t>* in) {
std::optional<Value> WireDecoder3::ReadValue(std::span<const uint8_t>* in) {
while (!in->empty()) {
switch (m_valueReader.type) {
case NT_BOOLEAN:
@@ -577,24 +455,3 @@ std::optional<Value> WDImpl::ReadValue(std::span<const uint8_t>* in) {
}
return std::nullopt;
}
struct WireDecoder3::Impl : public WDImpl {
explicit Impl(MessageHandler3& out) : WDImpl{out} {}
};
WireDecoder3::WireDecoder3(MessageHandler3& out) : m_impl{new Impl{out}} {}
WireDecoder3::~WireDecoder3() = default;
bool WireDecoder3::Execute(std::span<const uint8_t>* in) {
m_impl->Execute(in);
return m_impl->m_state != Impl::kError;
}
void WireDecoder3::SetError(std::string_view message) {
m_impl->EmitError(message);
}
std::string WireDecoder3::GetError() const {
return m_impl->m_error;
}

View File

@@ -7,8 +7,14 @@
#include <stdint.h>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <vector>
#include <wpi/leb128.h>
#include "ntcore_c.h"
namespace nt {
class Value;
@@ -18,6 +24,8 @@ namespace nt::net3 {
class MessageHandler3 {
public:
virtual ~MessageHandler3() = default;
virtual void KeepAlive() = 0;
virtual void ServerHelloDone() = 0;
virtual void ClientHelloDone() = 0;
@@ -42,8 +50,7 @@ class MessageHandler3 {
/* Decodes NT3 protocol into native representation. */
class WireDecoder3 {
public:
explicit WireDecoder3(MessageHandler3& out);
~WireDecoder3();
explicit WireDecoder3(MessageHandler3& out) : m_out{out} {}
/**
* Executes the decoder. All input data will be consumed unless an error
@@ -51,14 +58,126 @@ class WireDecoder3 {
* @param in input data (updated during parse)
* @return false if error occurred
*/
bool Execute(std::span<const uint8_t>* in);
bool Execute(std::span<const uint8_t>* in) {
DoExecute(in);
return m_state != kError;
}
void SetError(std::string_view message);
std::string GetError() const;
void SetError(std::string_view message) { EmitError(message); }
std::string GetError() const { return m_error; }
private:
struct Impl;
std::unique_ptr<Impl> m_impl;
class SimpleValueReader {
public:
std::optional<uint16_t> Read16(std::span<const uint8_t>* in);
std::optional<uint32_t> Read32(std::span<const uint8_t>* in);
std::optional<uint64_t> Read64(std::span<const uint8_t>* in);
std::optional<double> ReadDouble(std::span<const uint8_t>* in);
private:
uint64_t m_value = 0;
int m_count = 0;
};
struct StringReader {
void SetLen(uint64_t len_) {
len = len_;
buf.clear();
}
std::optional<uint64_t> len;
std::string buf;
};
struct RawReader {
void SetLen(uint64_t len_) {
len = len_;
buf.clear();
}
std::optional<uint64_t> len;
std::vector<uint8_t> buf;
};
struct ValueReader {
ValueReader() = default;
explicit ValueReader(NT_Type type_) : type{type_} {}
void SetSize(uint32_t size_) {
haveSize = true;
size = size_;
ints.clear();
doubles.clear();
strings.clear();
}
NT_Type type = NT_UNASSIGNED;
bool haveSize = false;
uint32_t size = 0;
std::vector<int> ints;
std::vector<double> doubles;
std::vector<std::string> strings;
};
MessageHandler3& m_out;
// primary (message) decode state
enum {
kStart,
kClientHello_1ProtoRev,
kClientHello_2Id,
kProtoUnsup_1ProtoRev,
kServerHello_1Flags,
kServerHello_2Id,
kEntryAssign_1Name,
kEntryAssign_2Type,
kEntryAssign_3Id,
kEntryAssign_4SeqNum,
kEntryAssign_5Flags,
kEntryAssign_6Value,
kEntryUpdate_1Id,
kEntryUpdate_2SeqNum,
kEntryUpdate_3Type,
kEntryUpdate_4Value,
kFlagsUpdate_1Id,
kFlagsUpdate_2Flags,
kEntryDelete_1Id,
kClearEntries_1Magic,
kExecuteRpc_1Id,
kExecuteRpc_2Uid,
kExecuteRpc_3Params,
kRpcResponse_1Id,
kRpcResponse_2Uid,
kRpcResponse_3Result,
kError
} m_state = kStart;
// detail decoders
wpi::Uleb128Reader m_ulebReader;
SimpleValueReader m_simpleReader;
StringReader m_stringReader;
RawReader m_rawReader;
ValueReader m_valueReader;
std::string m_error;
std::string m_str;
unsigned int m_id{0}; // also used for proto_rev
unsigned int m_flags{0};
unsigned int m_seq_num_uid{0};
void DoExecute(std::span<const uint8_t>* in);
std::nullopt_t EmitError(std::string_view msg) {
m_state = kError;
m_error = msg;
return std::nullopt;
}
std::optional<std::string> ReadString(std::span<const uint8_t>* in);
std::optional<std::vector<uint8_t>> ReadRaw(std::span<const uint8_t>* in);
std::optional<NT_Type> ReadType(std::span<const uint8_t>* in);
std::optional<Value> ReadValue(std::span<const uint8_t>* in);
};
} // namespace nt::net3