[ntcore] Split ServerImpl implementation into separate files

This commit is contained in:
Peter Johnson
2024-10-15 23:06:43 -07:00
parent f738fc92f0
commit 0921054a28
24 changed files with 2686 additions and 2295 deletions

View File

@@ -0,0 +1,13 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
namespace nt::server {
inline constexpr uint32_t kMinPeriodMs = 5;
} // namespace nt::server

View File

@@ -0,0 +1,16 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <functional>
#include <string_view>
namespace nt::server {
using SetPeriodicFunc = std::function<void(uint32_t repeatMs)>;
using Connected3Func =
std::function<void(std::string_view name, uint16_t proto)>;
} // namespace nt::server

View File

@@ -0,0 +1,31 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <vector>
#include <wpi/MessagePack.h>
#include <wpi/raw_ostream.h>
namespace nt::server {
struct Writer : public mpack::mpack_writer_t {
Writer() {
mpack::mpack_writer_init(this, buf, sizeof(buf));
mpack::mpack_writer_set_context(this, &os);
mpack::mpack_writer_set_flush(
this, [](mpack::mpack_writer_t* w, const char* buffer, size_t count) {
static_cast<wpi::raw_ostream*>(w->context)->write(buffer, count);
});
}
std::vector<uint8_t> bytes;
wpi::raw_uvector_ostream os{bytes};
char buf[128];
};
} // namespace nt::server

View File

@@ -0,0 +1,59 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerClient.h"
#include <utility>
#include <wpi/MessagePack.h>
#include "server/MessagePackWriter.h"
#include "server/ServerPublisher.h"
#include "server/ServerStorage.h"
using namespace nt::server;
using namespace mpack;
void ServerClient::UpdateMetaClientPub() {
if (!m_metaPub) {
return;
}
Writer w;
mpack_start_array(&w, m_publishers.size());
for (auto&& pub : m_publishers) {
mpack_write_object_bytes(&w, pub.second->GetMetaClientData());
}
mpack_finish_array(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
m_storage.SetValue(nullptr, m_metaPub, Value::MakeRaw(std::move(w.bytes)));
}
}
void ServerClient::UpdateMetaClientSub() {
if (!m_metaSub) {
return;
}
Writer w;
mpack_start_array(&w, m_subscribers.size());
for (auto&& sub : m_subscribers) {
mpack_write_object_bytes(&w, sub.second->GetMetaClientData());
}
mpack_finish_array(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
m_storage.SetValue(nullptr, m_metaSub, Value::MakeRaw(std::move(w.bytes)));
}
}
std::span<ServerSubscriber*> ServerClient::GetSubscribers(
std::string_view name, bool special,
wpi::SmallVectorImpl<ServerSubscriber*>& buf) {
buf.resize(0);
for (auto&& subPair : m_subscribers) {
ServerSubscriber* subscriber = subPair.getSecond().get();
if (subscriber->Matches(name, special)) {
buf.emplace_back(subscriber);
}
}
return {buf.data(), buf.size()};
}

View File

@@ -0,0 +1,100 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <string_view>
#include <utility>
#include <wpi/json_fwd.h>
#include "net/NetworkOutgoingQueue.h"
#include "server/Functions.h"
#include "server/ServerPublisher.h"
#include "server/ServerSubscriber.h"
namespace wpi {
class Logger;
template <typename T>
class SmallVectorImpl;
} // namespace wpi
namespace nt::server {
struct ServerTopic;
class ServerStorage;
struct TopicClientData;
class ServerClient {
public:
ServerClient(std::string_view name, std::string_view connInfo, bool local,
SetPeriodicFunc setPeriodic, ServerStorage& storage, int id,
wpi::Logger& logger)
: m_name{name},
m_connInfo{connInfo},
m_local{local},
m_setPeriodic{std::move(setPeriodic)},
m_storage{storage},
m_id{id},
m_logger{logger} {}
ServerClient(const ServerClient&) = delete;
ServerClient& operator=(const ServerClient&) = delete;
virtual ~ServerClient() = default;
// these return true if any messages have been queued for later processing
virtual bool ProcessIncomingText(std::string_view data) = 0;
virtual bool ProcessIncomingBinary(std::span<const uint8_t> data) = 0;
virtual void SendValue(ServerTopic* topic, const Value& value,
net::ValueSendMode mode) = 0;
virtual void SendAnnounce(ServerTopic* topic, std::optional<int> pubuid) = 0;
virtual void SendUnannounce(ServerTopic* topic) = 0;
virtual void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update,
bool ack) = 0;
virtual void SendOutgoing(uint64_t curTimeMs, bool flush) = 0;
virtual void Flush() = 0;
// later processing -- returns true if more to process
virtual bool ProcessIncomingMessages(size_t max) = 0;
void UpdateMetaClientPub();
void UpdateMetaClientSub();
std::span<ServerSubscriber*> GetSubscribers(
std::string_view name, bool special,
wpi::SmallVectorImpl<ServerSubscriber*>& buf);
std::string_view GetName() const { return m_name; }
int GetId() const { return m_id; }
virtual void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) {}
protected:
std::string m_name;
std::string m_connInfo;
bool m_local; // local to machine
SetPeriodicFunc m_setPeriodic;
// TODO: make this per-topic?
uint32_t m_periodMs{UINT32_MAX};
ServerStorage& m_storage;
int m_id;
wpi::Logger& m_logger;
wpi::DenseMap<int, std::unique_ptr<ServerPublisher>> m_publishers;
wpi::DenseMap<int, std::unique_ptr<ServerSubscriber>> m_subscribers;
public:
// meta topics
ServerTopic* m_metaPub = nullptr;
ServerTopic* m_metaSub = nullptr;
};
} // namespace nt::server

View File

@@ -0,0 +1,482 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerClient3.h"
#include <memory>
#include <string>
#include <wpi/timestamp.h>
#include "Log.h"
#include "Types_internal.h"
#include "net3/WireEncoder3.h"
#include "server/ServerImpl.h"
#include "server/ServerPublisher.h"
#include "server/ServerTopic.h"
using namespace nt::server;
// maximum amount of time the wire can be not ready to send another
// transmission before we close the connection
static constexpr uint32_t kWireMaxNotReadyUs = 1000000;
bool ServerClient3::TopicData3::UpdateFlags(ServerTopic* topic) {
unsigned int newFlags = topic->persistent ? NT_PERSISTENT : 0;
bool updated = flags != newFlags;
flags = newFlags;
return updated;
}
bool ServerClient3::ProcessIncomingBinary(std::span<const uint8_t> data) {
if (!m_decoder.Execute(&data)) {
m_wire.Disconnect(m_decoder.GetError());
}
return false;
}
void ServerClient3::SendValue(ServerTopic* topic, const Value& value,
net::ValueSendMode mode) {
if (m_state != kStateRunning) {
if (mode == net::ValueSendMode::kImm) {
mode = net::ValueSendMode::kAll;
}
} else if (m_local) {
mode = net::ValueSendMode::kImm; // always send local immediately
}
TopicData3* topic3 = GetTopic3(topic);
bool added = false;
switch (mode) {
case net::ValueSendMode::kDisabled: // do nothing
break;
case net::ValueSendMode::kImm: // send immediately
++topic3->seqNum;
if (topic3->sentAssign) {
net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id,
topic3->seqNum.value(), value);
} else {
net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name,
topic->id, topic3->seqNum.value(), value,
topic3->flags);
topic3->sentAssign = true;
}
if (m_local) {
Flush();
}
break;
case net::ValueSendMode::kNormal: {
// replace, or append if not present
wpi::DenseMap<NT_Topic, size_t>::iterator it;
std::tie(it, added) =
m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size());
if (!added && it->second < m_outgoing.size()) {
auto& msg = m_outgoing[it->second];
if (msg.Is(net3::Message3::kEntryUpdate) ||
msg.Is(net3::Message3::kEntryAssign)) {
if (msg.id() == topic->id) { // should always be true
msg.SetValue(value);
break;
}
}
}
}
// fallthrough
case net::ValueSendMode::kAll: // append to outgoing
if (!added) {
m_outgoingValueMap[topic->id] = m_outgoing.size();
}
++topic3->seqNum;
if (topic3->sentAssign) {
m_outgoing.emplace_back(net3::Message3::EntryUpdate(
topic->id, topic3->seqNum.value(), value));
} else {
m_outgoing.emplace_back(net3::Message3::EntryAssign(
topic->name, topic->id, topic3->seqNum.value(), value,
topic3->flags));
topic3->sentAssign = true;
}
break;
}
}
void ServerClient3::SendAnnounce(ServerTopic* topic,
std::optional<int> pubuid) {
// ignore if we've not yet built the subscriber
if (m_subscribers.empty()) {
return;
}
// subscribe to all non-special topics
if (!topic->special) {
topic->clients[this].AddSubscriber(m_subscribers[0].get());
m_storage.UpdateMetaTopicSub(topic);
}
// NT3 requires a value to send the assign message, so the assign message
// will get sent when the first value is sent (by SendValue).
}
void ServerClient3::SendUnannounce(ServerTopic* topic) {
auto it = m_topics3.find(topic);
if (it == m_topics3.end()) {
return; // never sent to client
}
bool sentAssign = it->second.sentAssign;
m_topics3.erase(it);
if (!sentAssign) {
return; // never sent to client
}
// map to NT3 delete message
if (m_local && m_state == kStateRunning) {
net3::WireEncodeEntryDelete(m_wire.Send().stream(), topic->id);
Flush();
} else {
m_outgoing.emplace_back(net3::Message3::EntryDelete(topic->id));
}
}
void ServerClient3::SendPropertiesUpdate(ServerTopic* topic,
const wpi::json& update, bool ack) {
if (ack) {
return; // we don't ack in NT3
}
auto it = m_topics3.find(topic);
if (it == m_topics3.end()) {
return; // never sent to client
}
TopicData3* topic3 = &it->second;
// Don't send flags update unless we've already sent an assign message.
// The assign message will contain the updated flags when we eventually
// send it.
if (topic3->UpdateFlags(topic) && topic3->sentAssign) {
if (m_local && m_state == kStateRunning) {
net3::WireEncodeFlagsUpdate(m_wire.Send().stream(), topic->id,
topic3->flags);
Flush();
} else {
m_outgoing.emplace_back(
net3::Message3::FlagsUpdate(topic->id, topic3->flags));
}
}
}
void ServerClient3::SendOutgoing(uint64_t curTimeMs, bool flush) {
if (m_outgoing.empty() || m_state != kStateRunning) {
return; // nothing to do
}
// rate limit frequency of transmissions
if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) {
return;
}
if (!m_wire.Ready()) {
uint64_t lastFlushTime = m_wire.GetLastFlushTime();
uint64_t now = wpi::Now();
if (lastFlushTime != 0 && now > (lastFlushTime + kWireMaxNotReadyUs)) {
m_wire.Disconnect("transmit stalled");
}
return;
}
auto out = m_wire.Send();
for (auto&& msg : m_outgoing) {
net3::WireEncode(out.stream(), msg);
}
m_wire.Flush();
m_outgoing.resize(0);
m_outgoingValueMap.clear();
m_lastSendMs = curTimeMs;
}
void ServerClient3::KeepAlive() {
DEBUG4("KeepAlive({})", m_id);
if (m_state != kStateRunning) {
m_decoder.SetError("received unexpected KeepAlive message");
return;
}
// ignore
}
void ServerClient3::ServerHelloDone() {
DEBUG4("ServerHelloDone({})", m_id);
m_decoder.SetError("received unexpected ServerHelloDone message");
}
void ServerClient3::ClientHelloDone() {
DEBUG4("ClientHelloDone({})", m_id);
if (m_state != kStateServerHelloComplete) {
m_decoder.SetError("received unexpected ClientHelloDone message");
return;
}
m_state = kStateRunning;
}
void ServerClient3::ClearEntries() {
DEBUG4("ClearEntries({})", m_id);
if (m_state != kStateRunning) {
m_decoder.SetError("received unexpected ClearEntries message");
return;
}
for (auto topic3it : m_topics3) {
ServerTopic* topic = topic3it.first;
// make sure we send assign the next time
topic3it.second.sentAssign = false;
// unpublish from this client (if it was previously published)
if (topic3it.second.published) {
topic3it.second.published = false;
auto publisherIt = m_publishers.find(topic3it.second.pubuid);
if (publisherIt != m_publishers.end()) {
// remove publisher from topic
topic->RemovePublisher(this, publisherIt->second.get());
// remove publisher from client
m_publishers.erase(publisherIt);
// update meta data
m_storage.UpdateMetaTopicPub(topic);
UpdateMetaClientPub();
}
}
// set retained=false
m_storage.SetProperties(this, topic, {{"retained", false}});
}
}
void ServerClient3::ProtoUnsup(unsigned int proto_rev) {
DEBUG4("ProtoUnsup({})", m_id);
m_decoder.SetError("received unexpected ProtoUnsup message");
}
void ServerClient3::ClientHello(std::string_view self_id,
unsigned int proto_rev) {
DEBUG4("ClientHello({}, '{}', {:04x})", m_id, self_id, proto_rev);
if (m_state != kStateInitial) {
m_decoder.SetError("received unexpected ClientHello message");
return;
}
if (proto_rev != 0x0300) {
net3::WireEncodeProtoUnsup(m_wire.Send().stream(), 0x0300);
Flush();
m_decoder.SetError(
fmt::format("unsupported protocol version {:04x}", proto_rev));
return;
}
// create a unique name including client id
m_name = fmt::format("{}-NT3@{}", self_id, m_connInfo);
m_connected(m_name, 0x0300);
m_connected = nullptr; // no longer required
// create client meta topics
m_metaPub = m_storage.CreateMetaTopic(fmt::format("$clientpub${}", m_name));
m_metaSub = m_storage.CreateMetaTopic(fmt::format("$clientsub${}", m_name));
// subscribe and send initial assignments
auto& sub = m_subscribers[0];
std::string prefix;
PubSubOptions options;
options.prefixMatch = true;
sub = std::make_unique<ServerSubscriber>(
GetName(), std::span<const std::string>{{prefix}}, 0, options);
m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->GetPeriodMs());
m_setPeriodic(m_periodMs);
{
auto out = m_wire.Send();
net3::WireEncodeServerHello(out.stream(), 0, "server");
m_storage.ForEachTopic([&](ServerTopic* topic) {
if (topic && !topic->special && topic->IsPublished() &&
topic->lastValue) {
DEBUG4("client {}: initial announce of '{}' (id {})", m_id, topic->name,
topic->id);
topic->clients[this].AddSubscriber(sub.get());
m_storage.UpdateMetaTopicSub(topic);
TopicData3* topic3 = GetTopic3(topic);
++topic3->seqNum;
net3::WireEncodeEntryAssign(out.stream(), topic->name, topic->id,
topic3->seqNum.value(), topic->lastValue,
topic3->flags);
topic3->sentAssign = true;
}
});
net3::WireEncodeServerHelloDone(out.stream());
}
Flush();
m_state = kStateServerHelloComplete;
// update meta topics
UpdateMetaClientPub();
UpdateMetaClientSub();
}
void ServerClient3::ServerHello(unsigned int flags, std::string_view self_id) {
DEBUG4("ServerHello({}, {}, {})", m_id, flags, self_id);
m_decoder.SetError("received unexpected ServerHello message");
}
void ServerClient3::EntryAssign(std::string_view name, unsigned int id,
unsigned int seq_num, const Value& value,
unsigned int flags) {
DEBUG4("EntryAssign({}, {}, {}, {}, {})", m_id, id, seq_num,
static_cast<int>(value.type()), flags);
if (id != 0xffff) {
DEBUG3("ignored EntryAssign from {} with non-0xffff id {}", m_id, id);
return;
}
// convert from NT3 info
auto typeStr = TypeToString(value.type());
wpi::json properties = wpi::json::object();
properties["retained"] = true; // treat all NT3 published topics as retained
properties["cached"] = true; // treat all NT3 published topics as cached
if ((flags & NT_PERSISTENT) != 0) {
properties["persistent"] = true;
}
// create topic
auto topic = m_storage.CreateTopic(this, name, typeStr, properties);
TopicData3* topic3 = GetTopic3(topic);
if (topic3->published || topic3->sentAssign) {
WARN("ignoring client {} duplicate publish of '{}'", m_id, name);
return;
}
++topic3->seqNum;
topic3->published = true;
topic3->pubuid = m_nextPubUid++;
topic3->sentAssign = true;
// create publisher
auto [publisherIt, isNew] = m_publishers.try_emplace(
topic3->pubuid,
std::make_unique<ServerPublisher>(GetName(), topic, topic3->pubuid));
if (!isNew) {
return; // shouldn't happen, but just in case...
}
// add publisher to topic
topic->AddPublisher(this, publisherIt->getSecond().get());
// update meta data
m_storage.UpdateMetaTopicPub(topic);
UpdateMetaClientPub();
// acts as an announce + data update
SendAnnounce(topic, topic3->pubuid);
m_storage.SetValue(this, topic, value);
// respond with assign message with assigned topic ID
if (m_local && m_state == kStateRunning) {
net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, topic->id,
topic3->seqNum.value(), value, topic3->flags);
} else {
m_outgoing.emplace_back(net3::Message3::EntryAssign(
topic->name, topic->id, topic3->seqNum.value(), value, topic3->flags));
}
}
void ServerClient3::EntryUpdate(unsigned int id, unsigned int seq_num,
const Value& value) {
DEBUG4("EntryUpdate({}, {}, {}, {})", m_id, id, seq_num,
static_cast<int>(value.type()));
if (m_state != kStateRunning) {
m_decoder.SetError("received unexpected EntryUpdate message");
return;
}
ServerTopic* topic = m_storage.GetTopic(id);
if (!topic || !topic->IsPublished()) {
DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id);
return;
}
TopicData3* topic3 = GetTopic3(topic);
if (!topic3->published) {
topic3->published = true;
topic3->pubuid = m_nextPubUid++;
// create publisher
auto [publisherIt, isNew] = m_publishers.try_emplace(
topic3->pubuid,
std::make_unique<ServerPublisher>(GetName(), topic, topic3->pubuid));
if (isNew) {
// add publisher to topic
topic->AddPublisher(this, publisherIt->getSecond().get());
// update meta data
m_storage.UpdateMetaTopicPub(topic);
UpdateMetaClientPub();
}
}
topic3->seqNum = net3::SequenceNumber{seq_num};
m_storage.SetValue(this, topic, value);
}
void ServerClient3::FlagsUpdate(unsigned int id, unsigned int flags) {
DEBUG4("FlagsUpdate({}, {}, {})", m_id, id, flags);
if (m_state != kStateRunning) {
m_decoder.SetError("received unexpected FlagsUpdate message");
return;
}
ServerTopic* topic = m_storage.GetTopic(id);
if (!topic || !topic->IsPublished()) {
DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id);
return;
}
if (topic->special) {
DEBUG3("ignored FlagsUpdate from {} on special topic {}", m_id, id);
return;
}
m_storage.SetFlags(this, topic, flags);
}
void ServerClient3::EntryDelete(unsigned int id) {
DEBUG4("EntryDelete({}, {})", m_id, id);
if (m_state != kStateRunning) {
m_decoder.SetError("received unexpected EntryDelete message");
return;
}
ServerTopic* topic = m_storage.GetTopic(id);
if (!topic || !topic->IsPublished()) {
DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id);
return;
}
if (topic->special) {
DEBUG3("ignored EntryDelete from {} on special topic {}", m_id, id);
return;
}
auto topic3it = m_topics3.find(topic);
if (topic3it != m_topics3.end()) {
// make sure we send assign the next time
topic3it->second.sentAssign = false;
// unpublish from this client (if it was previously published)
if (topic3it->second.published) {
topic3it->second.published = false;
auto publisherIt = m_publishers.find(topic3it->second.pubuid);
if (publisherIt != m_publishers.end()) {
// remove publisher from topic
topic->RemovePublisher(this, publisherIt->second.get());
// remove publisher from client
m_publishers.erase(publisherIt);
// update meta data
m_storage.UpdateMetaTopicPub(topic);
UpdateMetaClientPub();
}
}
}
// set retained=false
m_storage.SetProperties(this, topic, {{"retained", false}});
}

View File

@@ -0,0 +1,97 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <utility>
#include <vector>
#include "ServerClient.h"
#include "net/ClientMessageQueue.h"
#include "net3/Message3.h"
#include "net3/SequenceNumber.h"
#include "net3/WireConnection3.h"
#include "net3/WireDecoder3.h"
#include "server/Functions.h"
namespace nt::server {
class ServerClient3 final : public ServerClient, private net3::MessageHandler3 {
public:
ServerClient3(std::string_view connInfo, bool local,
net3::WireConnection3& wire, Connected3Func connected,
SetPeriodicFunc setPeriodic, ServerStorage& storage, int id,
wpi::Logger& logger)
: ServerClient{"", connInfo, local, setPeriodic, storage, id, logger},
m_connected{std::move(connected)},
m_wire{wire},
m_decoder{*this},
m_incoming{logger} {}
bool ProcessIncomingText(std::string_view data) final { return false; }
bool ProcessIncomingBinary(std::span<const uint8_t> data) final;
bool ProcessIncomingMessages(size_t max) final { return false; }
void SendValue(ServerTopic* topic, const Value& value,
net::ValueSendMode mode) final;
void SendAnnounce(ServerTopic* topic, std::optional<int> pubuid) final;
void SendUnannounce(ServerTopic* topic) final;
void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs, bool flush) final;
void Flush() final { m_wire.Flush(); }
private:
// MessageHandler3 interface
void KeepAlive() final;
void ServerHelloDone() final;
void ClientHelloDone() final;
void ClearEntries() final;
void ProtoUnsup(unsigned int proto_rev) final;
void ClientHello(std::string_view self_id, unsigned int proto_rev) final;
void ServerHello(unsigned int flags, std::string_view self_id) final;
void EntryAssign(std::string_view name, unsigned int id, unsigned int seq_num,
const Value& value, unsigned int flags) final;
void EntryUpdate(unsigned int id, unsigned int seq_num,
const Value& value) final;
void FlagsUpdate(unsigned int id, unsigned int flags) final;
void EntryDelete(unsigned int id) final;
void ExecuteRpc(unsigned int id, unsigned int uid,
std::span<const uint8_t> params) final {}
void RpcResponse(unsigned int id, unsigned int uid,
std::span<const uint8_t> result) final {}
Connected3Func m_connected;
net3::WireConnection3& m_wire;
enum State { kStateInitial, kStateServerHelloComplete, kStateRunning };
State m_state{kStateInitial};
net3::WireDecoder3 m_decoder;
net::NetworkIncomingClientQueue m_incoming;
std::vector<net3::Message3> m_outgoing;
wpi::DenseMap<NT_Topic, size_t> m_outgoingValueMap;
int64_t m_nextPubUid{1};
uint64_t m_lastSendMs{0};
struct TopicData3 {
explicit TopicData3(ServerTopic* topic) { UpdateFlags(topic); }
unsigned int flags{0};
net3::SequenceNumber seqNum;
bool sentAssign{false};
bool published{false};
int64_t pubuid{0};
bool UpdateFlags(ServerTopic* topic);
};
wpi::DenseMap<ServerTopic*, TopicData3> m_topics3;
TopicData3* GetTopic3(ServerTopic* topic) {
return &m_topics3.try_emplace(topic, topic).first->second;
}
};
} // namespace nt::server

View File

@@ -0,0 +1,180 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerClient4.h"
#include <string>
#include <wpi/timestamp.h>
#include "Log.h"
#include "net/WireDecoder.h"
#include "server/ServerStorage.h"
#include "server/ServerTopic.h"
using namespace nt::server;
ServerClient4::ServerClient4(std::string_view name, std::string_view connInfo,
bool local, net::WireConnection& wire,
SetPeriodicFunc setPeriodic,
ServerStorage& storage, int id,
wpi::Logger& logger)
: ServerClient4Base{name, connInfo, local, setPeriodic,
storage, id, logger},
m_wire{wire},
m_ping{wire},
m_incoming{logger},
m_outgoing{wire, local} {
// create client meta topics
m_metaPub = storage.CreateMetaTopic(fmt::format("$clientpub${}", name));
m_metaSub = storage.CreateMetaTopic(fmt::format("$clientsub${}", name));
// update meta topics
UpdateMetaClientPub();
UpdateMetaClientSub();
}
bool ServerClient4::ProcessIncomingText(std::string_view data) {
constexpr int kMaxImmProcessing = 10;
bool queueWasEmpty = m_incoming.empty();
// can't directly process, because we don't know how big it is
WireDecodeText(data, m_incoming, m_logger);
if (queueWasEmpty &&
DoProcessIncomingMessages(m_incoming, kMaxImmProcessing)) {
m_wire.StopRead();
return true;
}
return false;
}
bool ServerClient4::ProcessIncomingBinary(std::span<const uint8_t> data) {
constexpr int kMaxImmProcessing = 10;
// if we've already queued, keep queuing
int count = m_incoming.empty() ? 0 : kMaxImmProcessing;
for (;;) {
if (data.empty()) {
break;
}
// decode message
int pubuid;
Value value;
std::string error;
if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) {
m_wire.Disconnect(fmt::format("binary decode error: {}", error));
break;
}
// respond to RTT ping
if (pubuid == -1) {
auto now = wpi::Now();
DEBUG4("RTT ping from {}, responding with time={}", m_id, now);
m_wire.SendBinary(
[&](auto& os) { net::WireEncodeBinary(os, -1, now, value); });
continue;
}
// handle value set
if (++count < kMaxImmProcessing) {
ClientSetValue(pubuid, value);
} else {
m_incoming.ClientSetValue(pubuid, value);
}
}
if (count >= kMaxImmProcessing) {
m_wire.StopRead();
return true;
}
return false;
}
void ServerClient4::SendValue(ServerTopic* topic, const Value& value,
net::ValueSendMode mode) {
m_outgoing.SendValue(topic->id, value, mode);
}
void ServerClient4::SendAnnounce(ServerTopic* topic,
std::optional<int> pubuid) {
auto& sent = m_announceSent[topic];
if (sent) {
return;
}
sent = true;
if (m_local) {
int unsent = m_wire.WriteText([&](auto& os) {
net::WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr,
topic->properties, pubuid);
});
if (unsent < 0) {
return; // error
}
if (unsent == 0 && m_wire.Flush() == 0) {
return;
}
}
m_outgoing.SendMessage(
topic->id, net::AnnounceMsg{topic->name, static_cast<int>(topic->id),
topic->typeStr, pubuid, topic->properties});
}
void ServerClient4::SendUnannounce(ServerTopic* topic) {
auto& sent = m_announceSent[topic];
if (!sent) {
return;
}
sent = false;
if (m_local) {
int unsent = m_wire.WriteText([&](auto& os) {
net::WireEncodeUnannounce(os, topic->name, topic->id);
});
if (unsent < 0) {
return; // error
}
if (unsent == 0 && m_wire.Flush() == 0) {
return;
}
}
m_outgoing.SendMessage(
topic->id, net::UnannounceMsg{topic->name, static_cast<int>(topic->id)});
m_outgoing.EraseId(topic->id);
}
void ServerClient4::SendPropertiesUpdate(ServerTopic* topic,
const wpi::json& update, bool ack) {
if (!m_announceSent.lookup(topic)) {
return;
}
if (m_local) {
int unsent = m_wire.WriteText([&](auto& os) {
net::WireEncodePropertiesUpdate(os, topic->name, update, ack);
});
if (unsent < 0) {
return; // error
}
if (unsent == 0 && m_wire.Flush() == 0) {
return;
}
}
m_outgoing.SendMessage(topic->id,
net::PropertiesUpdateMsg{topic->name, update, ack});
}
void ServerClient4::SendOutgoing(uint64_t curTimeMs, bool flush) {
if (m_wire.GetVersion() >= 0x0401) {
if (!m_ping.Send(curTimeMs)) {
return;
}
}
m_outgoing.SendOutgoing(curTimeMs, flush);
}
void ServerClient4::UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) {
uint32_t period = net::CalculatePeriod(
tcd.subscribers, [](auto& x) { return x->GetPeriodMs(); });
DEBUG4("updating {} period to {} ms", topic->name, period);
m_outgoing.SetPeriod(topic->id, period);
}

View File

@@ -0,0 +1,54 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <string_view>
#include "net/NetworkPing.h"
#include "net/WireConnection.h"
#include "server/Functions.h"
#include "server/ServerClient4Base.h"
namespace nt::server {
class ServerClient4 final : public ServerClient4Base {
public:
ServerClient4(std::string_view name, std::string_view connInfo, bool local,
net::WireConnection& wire, SetPeriodicFunc setPeriodic,
ServerStorage& storage, int id, wpi::Logger& logger);
bool ProcessIncomingText(std::string_view data) final;
bool ProcessIncomingBinary(std::span<const uint8_t> data) final;
bool ProcessIncomingMessages(size_t max) final {
if (!DoProcessIncomingMessages(m_incoming, max)) {
m_wire.StartRead();
return false;
}
return true;
}
void SendValue(ServerTopic* topic, const Value& value,
net::ValueSendMode mode) final;
void SendAnnounce(ServerTopic* topic, std::optional<int> pubuid) final;
void SendUnannounce(ServerTopic* topic) final;
void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs, bool flush) final;
void Flush() final {}
void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) final;
public:
net::WireConnection& m_wire;
private:
net::NetworkPing m_ping;
net::NetworkIncomingClientQueue m_incoming;
net::NetworkOutgoingQueue<net::ServerMessage> m_outgoing;
};
} // namespace nt::server

View File

@@ -0,0 +1,242 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerClient4Base.h"
#include <memory>
#include <string>
#include <vector>
#include <fmt/ranges.h>
#include <wpi/SpanExtras.h>
#include "Log.h"
#include "server/ServerImpl.h"
#include "server/ServerPublisher.h"
using namespace nt::server;
void ServerClient4Base::ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
DEBUG3("ClientPublish({}, {}, {}, {})", m_id, name, pubuid, typeStr);
auto topic = m_storage.CreateTopic(this, name, typeStr, properties);
// create publisher
auto [publisherIt, isNew] = m_publishers.try_emplace(
pubuid, std::make_unique<ServerPublisher>(GetName(), topic, pubuid));
if (!isNew) {
WARN("client {} duplicate publish of pubuid {}", m_id, pubuid);
} else {
// add publisher to topic
topic->AddPublisher(this, publisherIt->getSecond().get());
// update meta data
m_storage.UpdateMetaTopicPub(topic);
}
// respond with announce with pubuid to client
DEBUG4("client {}: announce {} pubuid {}", m_id, topic->name, pubuid);
SendAnnounce(topic, pubuid);
}
void ServerClient4Base::ClientUnpublish(int pubuid) {
DEBUG3("ClientUnpublish({}, {})", m_id, pubuid);
auto publisherIt = m_publishers.find(pubuid);
if (publisherIt == m_publishers.end()) {
return; // nothing to do
}
auto publisher = publisherIt->getSecond().get();
auto topic = publisher->GetTopic();
// remove publisher from topic
topic->RemovePublisher(this, publisher);
// remove publisher from client
m_publishers.erase(publisherIt);
// update meta data
m_storage.UpdateMetaTopicPub(topic);
// delete topic if no longer published
if (!topic->IsPublished()) {
m_storage.DeleteTopic(topic);
}
}
void ServerClient4Base::ClientSetProperties(std::string_view name,
const wpi::json& update) {
DEBUG4("ClientSetProperties({}, {}, {})", m_id, name, update.dump());
ServerTopic* topic = m_storage.GetTopic(name);
if (!topic || !topic->IsPublished()) {
WARN(
"server ignoring SetProperties({}) from client {} on unpublished topic "
"'{}'; publish or set a value first",
update.dump(), m_id, name);
return; // nothing to do
}
if (topic->special) {
WARN("server ignoring SetProperties({}) from client {} on meta topic '{}'",
update.dump(), m_id, name);
return; // nothing to do
}
m_storage.SetProperties(nullptr, topic, update);
}
void ServerClient4Base::ClientSubscribe(int subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
DEBUG4("ClientSubscribe({}, ({}), {})", m_id, fmt::join(topicNames, ","),
subuid);
auto& sub = m_subscribers[subuid];
bool replace = false;
if (sub) {
// replace subscription
sub->Update(topicNames, options);
replace = true;
} else {
// create
sub = std::make_unique<ServerSubscriber>(GetName(), topicNames, subuid,
options);
}
// update periodic sender (if not local)
if (!m_local) {
m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->GetPeriodMs());
m_setPeriodic(m_periodMs);
}
// see if this immediately subscribes to any topics
// for transmit efficiency, we want to batch announcements and values, so
// send announcements in first loop and remember what we want to send in
// second loop.
std::vector<ServerTopic*> dataToSend;
dataToSend.reserve(m_storage.GetNumTopics());
m_storage.ForEachTopic([&](ServerTopic* topic) {
auto tcdIt = topic->clients.find(this);
bool removed = tcdIt != topic->clients.end() && replace &&
tcdIt->second.subscribers.erase(sub.get());
// is client already subscribed?
bool wasSubscribed =
tcdIt != topic->clients.end() && !tcdIt->second.subscribers.empty();
bool wasSubscribedValue =
wasSubscribed ? tcdIt->second.sendMode != net::ValueSendMode::kDisabled
: false;
bool added = false;
if (sub->Matches(topic->name, topic->special)) {
if (tcdIt == topic->clients.end()) {
tcdIt = topic->clients.try_emplace(this).first;
}
tcdIt->second.AddSubscriber(sub.get());
added = true;
}
if (added ^ removed) {
UpdatePeriod(tcdIt->second, topic);
m_storage.UpdateMetaTopicSub(topic);
}
// announce topic to client if not previously announced
if (added && !removed && !wasSubscribed) {
DEBUG4("client {}: announce {}", m_id, topic->name);
SendAnnounce(topic, std::nullopt);
}
// send last value
if (added && !sub->GetOptions().topicsOnly && !wasSubscribedValue &&
topic->lastValue) {
dataToSend.emplace_back(topic);
}
});
for (auto topic : dataToSend) {
DEBUG4("send last value for {} to client {}", topic->name, m_id);
SendValue(topic, topic->lastValue, net::ValueSendMode::kAll);
}
}
void ServerClient4Base::ClientUnsubscribe(int subuid) {
DEBUG3("ClientUnsubscribe({}, {})", m_id, subuid);
auto subIt = m_subscribers.find(subuid);
if (subIt == m_subscribers.end() || !subIt->getSecond()) {
return; // nothing to do
}
auto sub = subIt->getSecond().get();
// remove from topics
m_storage.ForEachTopic([&](ServerTopic* topic) {
auto tcdIt = topic->clients.find(this);
if (tcdIt != topic->clients.end()) {
if (tcdIt->second.subscribers.erase(sub)) {
UpdatePeriod(tcdIt->second, topic);
m_storage.UpdateMetaTopicSub(topic);
}
}
});
// delete it from client (future value sets will be ignored)
m_subscribers.erase(subIt);
// loop over all subscribers to update period
if (!m_local) {
m_periodMs = net::CalculatePeriod(
m_subscribers, [](auto& x) { return x.getSecond()->GetPeriodMs(); });
m_setPeriodic(m_periodMs);
}
}
void ServerClient4Base::ClientSetValue(int pubuid, const Value& value) {
DEBUG4("ClientSetValue({}, {})", m_id, pubuid);
auto publisherIt = m_publishers.find(pubuid);
if (publisherIt == m_publishers.end()) {
WARN("unrecognized client {} pubuid {}, ignoring set", m_id, pubuid);
return; // ignore unrecognized pubuids
}
auto topic = publisherIt->getSecond().get()->GetTopic();
m_storage.SetValue(this, topic, value);
}
bool ServerClient4Base::DoProcessIncomingMessages(
net::ClientMessageQueue& queue, size_t max) {
DEBUG4("ProcessIncomingMessage()");
max = (std::min)(m_msgsBuf.size(), max);
std::span<net::ClientMessage> msgs =
queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max));
// just map as a normal client into client=0 calls
bool updatepub = false;
bool updatesub = false;
for (const auto& elem : msgs) { // NOLINT
// common case is value, so check that first
if (auto msg = std::get_if<net::ClientValueMsg>(&elem.contents)) {
ClientSetValue(msg->pubuid, msg->value);
} else if (auto msg = std::get_if<net::PublishMsg>(&elem.contents)) {
ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties,
msg->options);
updatepub = true;
} else if (auto msg = std::get_if<net::UnpublishMsg>(&elem.contents)) {
ClientUnpublish(msg->pubuid);
updatepub = true;
} else if (auto msg = std::get_if<net::SetPropertiesMsg>(&elem.contents)) {
ClientSetProperties(msg->name, msg->update);
} else if (auto msg = std::get_if<net::SubscribeMsg>(&elem.contents)) {
ClientSubscribe(msg->subuid, msg->topicNames, msg->options);
updatesub = true;
} else if (auto msg = std::get_if<net::UnsubscribeMsg>(&elem.contents)) {
ClientUnsubscribe(msg->subuid);
updatesub = true;
}
}
if (updatepub) {
UpdateMetaClientPub();
}
if (updatesub) {
UpdateMetaClientSub();
}
return msgs.size() == max; // don't know for sure, but there might be more
}

View File

@@ -0,0 +1,48 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <string>
#include <string_view>
#include <wpi/DenseMap.h>
#include "net/ClientMessageQueue.h"
#include "server/Functions.h"
#include "server/ServerClient.h"
namespace nt::server {
class ServerClient4Base : public ServerClient,
protected net::ClientMessageHandler {
public:
ServerClient4Base(std::string_view name, std::string_view connInfo,
bool local, SetPeriodicFunc setPeriodic,
ServerStorage& storage, int id, wpi::Logger& logger)
: ServerClient{name, connInfo, local, setPeriodic, storage, id, logger} {}
protected:
// ClientMessageHandler interface
void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void ClientUnpublish(int pubuid) final;
void ClientSetProperties(std::string_view name,
const wpi::json& update) final;
void ClientSubscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void ClientUnsubscribe(int subuid) final;
void ClientSetValue(int pubuid, const Value& value) final;
bool DoProcessIncomingMessages(net::ClientMessageQueue& queue, size_t max);
wpi::DenseMap<ServerTopic*, bool> m_announceSent;
private:
std::array<net::ClientMessage, 16> m_msgsBuf;
};
} // namespace nt::server

View File

@@ -0,0 +1,64 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerClientLocal.h"
#include "server/ServerImpl.h"
using namespace nt::server;
ServerClientLocal::ServerClientLocal(ServerStorage& storage, int id,
wpi::Logger& logger)
: ServerClient4Base{"", "", true, [](uint32_t) {}, storage, id, logger} {
// create local client meta topics
m_metaPub = storage.CreateMetaTopic("$serverpub");
m_metaSub = storage.CreateMetaTopic("$serversub");
// update meta topics
UpdateMetaClientPub();
UpdateMetaClientSub();
}
void ServerClientLocal::SendValue(ServerTopic* topic, const Value& value,
net::ValueSendMode mode) {
if (m_local) {
m_local->ServerSetValue(topic->localTopic, value);
}
}
void ServerClientLocal::SendAnnounce(ServerTopic* topic,
std::optional<int> pubuid) {
if (m_local) {
auto& sent = m_announceSent[topic];
if (sent) {
return;
}
sent = true;
topic->localTopic = m_local->ServerAnnounce(topic->name, 0, topic->typeStr,
topic->properties, pubuid);
}
}
void ServerClientLocal::SendUnannounce(ServerTopic* topic) {
if (m_local) {
auto& sent = m_announceSent[topic];
if (!sent) {
return;
}
sent = false;
m_local->ServerUnannounce(topic->name, topic->localTopic);
}
}
void ServerClientLocal::SendPropertiesUpdate(ServerTopic* topic,
const wpi::json& update,
bool ack) {
if (m_local) {
if (!m_announceSent.lookup(topic)) {
return;
}
m_local->ServerPropertiesUpdate(topic->name, update, ack);
}
}

View File

@@ -0,0 +1,50 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <span>
#include <string_view>
#include "server/ServerClient4Base.h"
namespace nt::server {
class ServerClientLocal final : public ServerClient4Base {
public:
ServerClientLocal(ServerStorage& storage, int id, wpi::Logger& logger);
bool ProcessIncomingText(std::string_view data) final { return false; }
bool ProcessIncomingBinary(std::span<const uint8_t> data) final {
return false;
}
bool ProcessIncomingMessages(size_t max) final {
if (!m_queue) {
return false;
}
return DoProcessIncomingMessages(*m_queue, max);
}
void SendValue(ServerTopic* topic, const Value& value,
net::ValueSendMode mode) final;
void SendAnnounce(ServerTopic* topic, std::optional<int> pubuid) final;
void SendUnannounce(ServerTopic* topic) final;
void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs, bool flush) final {}
void Flush() final {}
void SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue) {
m_local = local;
m_queue = queue;
}
private:
net::ServerMessageHandler* m_local = nullptr;
net::ClientMessageQueue* m_queue = nullptr;
};
} // namespace nt::server

File diff suppressed because it is too large Load Diff

View File

@@ -6,8 +6,6 @@
#include <stdint.h>
#include <cmath>
#include <functional>
#include <memory>
#include <span>
#include <string>
@@ -15,35 +13,18 @@
#include <utility>
#include <vector>
#include <wpi/DenseMap.h>
#include <wpi/SmallPtrSet.h>
#include <wpi/StringMap.h>
#include <wpi/UidVector.h>
#include <wpi/json.h>
#include "PubSubOptions.h"
#include "net/ClientMessageQueue.h"
#include "net/Message.h"
#include "net/NetworkOutgoingQueue.h"
#include "net/NetworkPing.h"
#include "net/WireConnection.h"
#include "net/WireDecoder.h"
#include "net/WireEncoder.h"
#include "net3/Message3.h"
#include "net3/SequenceNumber.h"
#include "net3/WireConnection3.h"
#include "net3/WireDecoder3.h"
#include "server/Functions.h"
#include "server/ServerClient.h"
#include "server/ServerStorage.h"
namespace wpi {
class Logger;
template <typename T>
class SmallVectorImpl;
class raw_ostream;
} // namespace wpi
namespace nt::net {
struct ClientMessage;
class ClientMessageQueue;
class LocalInterface;
class ServerMessageHandler;
class WireConnection;
} // namespace nt::net
@@ -53,12 +34,11 @@ class WireConnection3;
namespace nt::server {
class ServerClientLocal;
struct ServerTopic;
class ServerImpl final {
public:
using SetPeriodicFunc = std::function<void(uint32_t repeatMs)>;
using Connected3Func =
std::function<void(std::string_view name, uint16_t proto)>;
explicit ServerImpl(wpi::Logger& logger);
void SendAllOutgoing(uint64_t curTimeMs, bool flush);
@@ -86,433 +66,34 @@ class ServerImpl final {
SetPeriodicFunc setPeriodic);
std::shared_ptr<void> RemoveClient(int clientId);
void ConnectionsChanged(const std::vector<ConnectionInfo>& conns);
void ConnectionsChanged(const std::vector<ConnectionInfo>& conns) {
UpdateMetaClients(conns);
}
// if any persistent values changed since the last call to this function
bool PersistentChanged();
bool PersistentChanged() { return m_storage.PersistentChanged(); }
std::string DumpPersistent();
// returns newline-separated errors
std::string LoadPersistent(std::string_view in);
std::string LoadPersistent(std::string_view in) {
return m_storage.LoadPersistent(in);
}
private:
static constexpr uint32_t kMinPeriodMs = 5;
class ClientData;
struct PublisherData;
struct SubscriberData;
struct TopicData {
TopicData(wpi::Logger& logger, std::string_view name,
std::string_view typeStr)
: m_logger{logger}, name{name}, typeStr{typeStr} {}
TopicData(wpi::Logger& logger, std::string_view name,
std::string_view typeStr, wpi::json properties)
: m_logger{logger},
name{name},
typeStr{typeStr},
properties(std::move(properties)) {
RefreshProperties();
}
bool IsPublished() const {
return persistent || retained || publisherCount != 0;
}
// returns true if properties changed
bool SetProperties(const wpi::json& update);
void RefreshProperties();
bool SetFlags(unsigned int flags_);
wpi::Logger& m_logger; // Must be m_logger for WARN macro to work
std::string name;
unsigned int id;
Value lastValue;
ClientData* lastValueClient = nullptr;
std::string typeStr;
wpi::json properties = wpi::json::object();
unsigned int publisherCount{0};
bool persistent{false};
bool retained{false};
bool cached{true};
bool special{false};
int localTopic{0};
void AddPublisher(ClientData* client, PublisherData* pub) {
if (clients[client].publishers.insert(pub).second) {
++publisherCount;
}
}
void RemovePublisher(ClientData* client, PublisherData* pub) {
if (clients[client].publishers.erase(pub)) {
--publisherCount;
}
}
struct TopicClientData {
wpi::SmallPtrSet<PublisherData*, 2> publishers;
wpi::SmallPtrSet<SubscriberData*, 2> subscribers;
net::ValueSendMode sendMode = net::ValueSendMode::kDisabled;
bool AddSubscriber(SubscriberData* sub) {
bool added = subscribers.insert(sub).second;
if (!sub->options.topicsOnly) {
if (sub->options.sendAll) {
sendMode = net::ValueSendMode::kAll;
} else if (sendMode == net::ValueSendMode::kDisabled) {
sendMode = net::ValueSendMode::kNormal;
}
}
return added;
}
};
wpi::SmallDenseMap<ClientData*, TopicClientData, 4> clients;
// meta topics
TopicData* metaPub = nullptr;
TopicData* metaSub = nullptr;
};
class ClientData {
public:
ClientData(std::string_view name, std::string_view connInfo, bool local,
ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server,
int id, wpi::Logger& logger)
: m_name{name},
m_connInfo{connInfo},
m_local{local},
m_setPeriodic{std::move(setPeriodic)},
m_server{server},
m_id{id},
m_logger{logger} {}
virtual ~ClientData() = default;
// these return true if any messages have been queued for later processing
virtual bool ProcessIncomingText(std::string_view data) = 0;
virtual bool ProcessIncomingBinary(std::span<const uint8_t> data) = 0;
virtual void SendValue(TopicData* topic, const Value& value,
net::ValueSendMode mode) = 0;
virtual void SendAnnounce(TopicData* topic, std::optional<int> pubuid) = 0;
virtual void SendUnannounce(TopicData* topic) = 0;
virtual void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) = 0;
virtual void SendOutgoing(uint64_t curTimeMs, bool flush) = 0;
virtual void Flush() = 0;
// later processing -- returns true if more to process
virtual bool ProcessIncomingMessages(size_t max) = 0;
void UpdateMetaClientPub();
void UpdateMetaClientSub();
std::span<SubscriberData*> GetSubscribers(
std::string_view name, bool special,
wpi::SmallVectorImpl<SubscriberData*>& buf);
std::string_view GetName() const { return m_name; }
int GetId() const { return m_id; }
virtual void UpdatePeriod(TopicData::TopicClientData& tcd,
TopicData* topic) {}
protected:
std::string m_name;
std::string m_connInfo;
bool m_local; // local to machine
ServerImpl::SetPeriodicFunc m_setPeriodic;
// TODO: make this per-topic?
uint32_t m_periodMs{UINT32_MAX};
ServerImpl& m_server;
int m_id;
wpi::Logger& m_logger;
wpi::DenseMap<int, std::unique_ptr<PublisherData>> m_publishers;
wpi::DenseMap<int, std::unique_ptr<SubscriberData>> m_subscribers;
public:
// meta topics
TopicData* m_metaPub = nullptr;
TopicData* m_metaSub = nullptr;
};
class ClientData4Base : public ClientData,
protected net::ClientMessageHandler {
public:
ClientData4Base(std::string_view name, std::string_view connInfo,
bool local, ServerImpl::SetPeriodicFunc setPeriodic,
ServerImpl& server, int id, wpi::Logger& logger)
: ClientData{name, connInfo, local, setPeriodic, server, id, logger} {}
protected:
// ClientMessageHandler interface
void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void ClientUnpublish(int pubuid) final;
void ClientSetProperties(std::string_view name,
const wpi::json& update) final;
void ClientSubscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void ClientUnsubscribe(int subuid) final;
void ClientSetValue(int pubuid, const Value& value) final;
bool DoProcessIncomingMessages(net::ClientMessageQueue& queue, size_t max);
wpi::DenseMap<TopicData*, bool> m_announceSent;
private:
std::array<net::ClientMessage, 16> m_msgsBuf;
};
class ClientDataLocal final : public ClientData4Base {
public:
ClientDataLocal(ServerImpl& server, int id, wpi::Logger& logger)
: ClientData4Base{"", "", true, [](uint32_t) {}, server, id, logger} {}
bool ProcessIncomingText(std::string_view data) final { return false; }
bool ProcessIncomingBinary(std::span<const uint8_t> data) final {
return false;
}
bool ProcessIncomingMessages(size_t max) final {
if (!m_queue) {
return false;
}
return DoProcessIncomingMessages(*m_queue, max);
}
void SendValue(TopicData* topic, const Value& value,
net::ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs, bool flush) final {}
void Flush() final {}
void SetQueue(net::ClientMessageQueue* queue) { m_queue = queue; }
private:
net::ClientMessageQueue* m_queue = nullptr;
};
class ClientData4 final : public ClientData4Base {
public:
ClientData4(std::string_view name, std::string_view connInfo, bool local,
net::WireConnection& wire,
ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server,
int id, wpi::Logger& logger)
: ClientData4Base{name, connInfo, local, setPeriodic,
server, id, logger},
m_wire{wire},
m_ping{wire},
m_incoming{logger},
m_outgoing{wire, local} {}
bool ProcessIncomingText(std::string_view data) final;
bool ProcessIncomingBinary(std::span<const uint8_t> data) final;
bool ProcessIncomingMessages(size_t max) final {
if (!DoProcessIncomingMessages(m_incoming, max)) {
m_wire.StartRead();
return false;
}
return true;
}
void SendValue(TopicData* topic, const Value& value,
net::ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs, bool flush) final;
void Flush() final {}
void UpdatePeriod(TopicData::TopicClientData& tcd, TopicData* topic) final;
public:
net::WireConnection& m_wire;
private:
net::NetworkPing m_ping;
net::NetworkIncomingClientQueue m_incoming;
net::NetworkOutgoingQueue<net::ServerMessage> m_outgoing;
};
class ClientData3 final : public ClientData, private net3::MessageHandler3 {
public:
ClientData3(std::string_view connInfo, bool local,
net3::WireConnection3& wire,
ServerImpl::Connected3Func connected,
ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server,
int id, wpi::Logger& logger)
: ClientData{"", connInfo, local, setPeriodic, server, id, logger},
m_connected{std::move(connected)},
m_wire{wire},
m_decoder{*this},
m_incoming{logger} {}
bool ProcessIncomingText(std::string_view data) final { return false; }
bool ProcessIncomingBinary(std::span<const uint8_t> data) final;
bool ProcessIncomingMessages(size_t max) final { return false; }
void SendValue(TopicData* topic, const Value& value,
net::ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;
void SendOutgoing(uint64_t curTimeMs, bool flush) final;
void Flush() final { m_wire.Flush(); }
private:
// MessageHandler3 interface
void KeepAlive() final;
void ServerHelloDone() final;
void ClientHelloDone() final;
void ClearEntries() final;
void ProtoUnsup(unsigned int proto_rev) final;
void ClientHello(std::string_view self_id, unsigned int proto_rev) final;
void ServerHello(unsigned int flags, std::string_view self_id) final;
void EntryAssign(std::string_view name, unsigned int id,
unsigned int seq_num, const Value& value,
unsigned int flags) final;
void EntryUpdate(unsigned int id, unsigned int seq_num,
const Value& value) final;
void FlagsUpdate(unsigned int id, unsigned int flags) final;
void EntryDelete(unsigned int id) final;
void ExecuteRpc(unsigned int id, unsigned int uid,
std::span<const uint8_t> params) final {}
void RpcResponse(unsigned int id, unsigned int uid,
std::span<const uint8_t> result) final {}
ServerImpl::Connected3Func m_connected;
net3::WireConnection3& m_wire;
enum State { kStateInitial, kStateServerHelloComplete, kStateRunning };
State m_state{kStateInitial};
net3::WireDecoder3 m_decoder;
net::NetworkIncomingClientQueue m_incoming;
std::vector<net3::Message3> m_outgoing;
wpi::DenseMap<NT_Topic, size_t> m_outgoingValueMap;
int64_t m_nextPubUid{1};
uint64_t m_lastSendMs{0};
struct TopicData3 {
explicit TopicData3(TopicData* topic) { UpdateFlags(topic); }
unsigned int flags{0};
net3::SequenceNumber seqNum;
bool sentAssign{false};
bool published{false};
int64_t pubuid{0};
bool UpdateFlags(TopicData* topic);
};
wpi::DenseMap<TopicData*, TopicData3> m_topics3;
TopicData3* GetTopic3(TopicData* topic) {
return &m_topics3.try_emplace(topic, topic).first->second;
}
};
struct PublisherData {
PublisherData(ClientData* client, TopicData* topic, int64_t pubuid)
: client{client}, topic{topic}, pubuid{pubuid} {
UpdateMeta();
}
void UpdateMeta();
ClientData* client;
TopicData* topic;
int64_t pubuid;
std::vector<uint8_t> metaClient;
std::vector<uint8_t> metaTopic;
};
struct SubscriberData {
SubscriberData(ClientData* client, std::span<const std::string> topicNames,
int64_t subuid, const PubSubOptionsImpl& options)
: client{client},
topicNames{topicNames.begin(), topicNames.end()},
subuid{subuid},
options{options},
periodMs(std::lround(options.periodicMs / 10.0) * 10) {
UpdateMeta();
if (periodMs < kMinPeriodMs) {
periodMs = kMinPeriodMs;
}
}
void Update(std::span<const std::string> topicNames_,
const PubSubOptionsImpl& options_) {
topicNames = {topicNames_.begin(), topicNames_.end()};
options = options_;
UpdateMeta();
periodMs = std::lround(options_.periodicMs / 10.0) * 10;
if (periodMs < kMinPeriodMs) {
periodMs = kMinPeriodMs;
}
}
bool Matches(std::string_view name, bool special);
void UpdateMeta();
ClientData* client;
std::vector<std::string> topicNames;
int64_t subuid;
PubSubOptionsImpl options;
std::vector<uint8_t> metaClient;
std::vector<uint8_t> metaTopic;
wpi::DenseMap<TopicData*, bool> topics;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
uint32_t periodMs;
};
wpi::Logger& m_logger;
net::ServerMessageHandler* m_local{nullptr};
bool m_controlReady{false};
ClientDataLocal* m_localClient;
std::vector<std::unique_ptr<ClientData>> m_clients;
wpi::UidVector<std::unique_ptr<TopicData>, 16> m_topics;
wpi::StringMap<TopicData*> m_nameTopics;
bool m_persistentChanged{false};
ServerClientLocal* m_localClient;
std::vector<std::unique_ptr<ServerClient>> m_clients;
ServerStorage m_storage;
// global meta topics (other meta topics are linked to from the specific
// client or topic)
TopicData* m_metaClients;
ServerTopic* m_metaClients;
void DumpPersistent(wpi::raw_ostream& os);
// helper functions
TopicData* CreateTopic(ClientData* client, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
bool special = false);
TopicData* CreateMetaTopic(std::string_view name);
void DeleteTopic(TopicData* topic);
void SetProperties(ClientData* client, TopicData* topic,
const wpi::json& update);
void SetFlags(ClientData* client, TopicData* topic, unsigned int flags);
void SetValue(ClientData* client, TopicData* topic, const Value& value);
// update meta topic values from data structures
size_t GetEmptyClientSlot();
void SendAnnounce(ServerTopic* topic, ServerClient* client);
void UpdateMetaClients(const std::vector<ConnectionInfo>& conns);
void UpdateMetaTopicPub(TopicData* topic);
void UpdateMetaTopicSub(TopicData* topic);
void PropertiesChanged(ClientData* client, TopicData* topic,
const wpi::json& update);
};
} // namespace nt::server

View File

@@ -0,0 +1,42 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerPublisher.h"
#include <utility>
#include <wpi/MessagePack.h>
#include "server/MessagePackWriter.h"
#include "server/ServerTopic.h"
using namespace nt::server;
using namespace mpack;
void ServerPublisher::UpdateMeta() {
{
Writer w;
mpack_start_map(&w, 2);
mpack_write_str(&w, "uid");
mpack_write_int(&w, m_pubuid);
mpack_write_str(&w, "topic");
mpack_write_str(&w, m_topic->name);
mpack_finish_map(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
m_metaClient = std::move(w.bytes);
}
}
{
Writer w;
mpack_start_map(&w, 2);
mpack_write_str(&w, "client");
mpack_write_str(&w, m_clientName);
mpack_write_str(&w, "pubuid");
mpack_write_int(&w, m_pubuid);
mpack_finish_map(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
m_metaTopic = std::move(w.bytes);
}
}
}

View File

@@ -0,0 +1,43 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <span>
#include <string>
#include <string_view>
#include <vector>
namespace nt::server {
class ServerClient;
struct ServerTopic;
class ServerPublisher {
public:
ServerPublisher(std::string_view clientName, ServerTopic* topic,
int64_t pubuid)
: m_clientName{clientName}, m_topic{topic}, m_pubuid{pubuid} {
UpdateMeta();
}
ServerPublisher(const ServerPublisher&) = delete;
ServerPublisher& operator=(const ServerPublisher&) = delete;
ServerTopic* GetTopic() const { return m_topic; }
std::span<const uint8_t> GetMetaClientData() const { return m_metaClient; }
std::span<const uint8_t> GetMetaTopicData() const { return m_metaTopic; }
private:
void UpdateMeta();
std::string m_clientName;
ServerTopic* m_topic;
int64_t m_pubuid;
std::vector<uint8_t> m_metaClient;
std::vector<uint8_t> m_metaTopic;
};
} // namespace nt::server

View File

@@ -0,0 +1,608 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerStorage.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <fmt/format.h>
#include <wpi/Base64.h>
#include <wpi/MessagePack.h>
#include <wpi/json.h>
#include "Log.h"
#include "server/MessagePackWriter.h"
#include "server/ServerClient.h"
using namespace nt;
using namespace nt::server;
using namespace mpack;
ServerTopic* ServerStorage::CreateTopic(ServerClient* client,
std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
bool special) {
auto& topic = m_nameTopics[name];
if (topic) {
if (typeStr != topic->typeStr) {
if (client) {
WARN("client {} publish '{}' conflicting type '{}' (currently '{}')",
client->GetName(), name, typeStr, topic->typeStr);
}
}
} else {
// new topic
unsigned int id = m_topics.emplace_back(
std::make_unique<ServerTopic>(m_logger, name, typeStr, properties));
topic = m_topics[id].get();
topic->id = id;
topic->special = special;
m_sendAnnounce(topic, client);
// create meta topics; don't create if topic is itself a meta topic
if (!special) {
topic->metaPub = CreateMetaTopic(fmt::format("$pub${}", name));
topic->metaSub = CreateMetaTopic(fmt::format("$sub${}", name));
UpdateMetaTopicPub(topic);
UpdateMetaTopicSub(topic);
}
}
return topic;
}
ServerTopic* ServerStorage::CreateMetaTopic(std::string_view name) {
return CreateTopic(nullptr, name, "msgpack", {{"retained", true}}, true);
}
void ServerStorage::DeleteTopic(ServerTopic* topic) {
if (!topic) {
return;
}
// delete meta topics
if (topic->metaPub) {
DeleteTopic(topic->metaPub);
}
if (topic->metaSub) {
DeleteTopic(topic->metaSub);
}
// unannounce to all subscribers
for (auto&& tcd : topic->clients) {
if (!tcd.second.subscribers.empty()) {
tcd.first->UpdatePeriod(tcd.second, topic);
tcd.first->SendUnannounce(topic);
}
}
// erase the topic
m_nameTopics.erase(topic->name);
m_topics.erase(topic->id);
}
void ServerStorage::SetProperties(ServerClient* client, ServerTopic* topic,
const wpi::json& update) {
DEBUG4("SetProperties({}, {}, {})", client ? client->GetId() : -1,
topic->name, update.dump());
bool wasPersistent = topic->persistent;
if (topic->SetProperties(update)) {
// update persistentChanged flag
if (topic->persistent != wasPersistent) {
m_persistentChanged = true;
}
PropertiesChanged(client, topic, update);
}
}
void ServerStorage::SetFlags(ServerClient* client, ServerTopic* topic,
unsigned int flags) {
bool wasPersistent = topic->persistent;
if (topic->SetFlags(flags)) {
// update persistentChanged flag
if (topic->persistent != wasPersistent) {
m_persistentChanged = true;
wpi::json update;
if (topic->persistent) {
update = {{"persistent", true}};
} else {
update = {{"persistent", wpi::json::object()}};
}
PropertiesChanged(client, topic, update);
}
}
}
void ServerStorage::SetValue(ServerClient* client, ServerTopic* topic,
const Value& value) {
// update retained value if from same client or timestamp newer
if (topic->cached && (!topic->lastValue || topic->lastValueClient == client ||
topic->lastValue.time() == 0 ||
value.time() >= topic->lastValue.time())) {
DEBUG4("updating '{}' last value (time was {} is {})", topic->name,
topic->lastValue.time(), value.time());
topic->lastValue = value;
topic->lastValueClient = client;
// if persistent, update flag
if (topic->persistent) {
m_persistentChanged = true;
}
}
for (auto&& tcd : topic->clients) {
if (tcd.first != client &&
tcd.second.sendMode != net::ValueSendMode::kDisabled) {
tcd.first->SendValue(topic, value, tcd.second.sendMode);
}
}
}
void ServerStorage::RemoveClient(ServerClient* client) {
// remove all publishers and subscribers for this client
wpi::SmallVector<ServerTopic*, 16> toDelete;
for (auto&& topic : m_topics) {
bool pubChanged = false;
bool subChanged = false;
auto tcdIt = topic->clients.find(client);
if (tcdIt != topic->clients.end()) {
pubChanged = !tcdIt->second.publishers.empty();
subChanged = !tcdIt->second.subscribers.empty();
topic->publisherCount -= tcdIt->second.publishers.size();
topic->clients.erase(tcdIt);
}
if (!topic->IsPublished()) {
toDelete.push_back(topic.get());
} else {
if (pubChanged) {
UpdateMetaTopicPub(topic.get());
}
if (subChanged) {
UpdateMetaTopicSub(topic.get());
}
}
}
// delete unpublished topics
for (auto topic : toDelete) {
DeleteTopic(topic);
}
DeleteTopic(client->m_metaPub);
DeleteTopic(client->m_metaSub);
}
void ServerStorage::UpdateMetaTopicPub(ServerTopic* topic) {
if (!topic->metaPub) {
return;
}
Writer w;
uint32_t count = 0;
for (auto&& tcd : topic->clients) {
count += tcd.second.publishers.size();
}
mpack_start_array(&w, count);
for (auto&& tcd : topic->clients) {
for (auto&& pub : tcd.second.publishers) {
mpack_write_object_bytes(&w, pub->GetMetaTopicData());
}
}
mpack_finish_array(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
SetValue(nullptr, topic->metaPub, Value::MakeRaw(std::move(w.bytes)));
}
}
void ServerStorage::UpdateMetaTopicSub(ServerTopic* topic) {
if (!topic->metaSub) {
return;
}
Writer w;
uint32_t count = 0;
for (auto&& tcd : topic->clients) {
count += tcd.second.subscribers.size();
}
mpack_start_array(&w, count);
for (auto&& tcd : topic->clients) {
for (auto&& sub : tcd.second.subscribers) {
mpack_write_object_bytes(&w, sub->GetMetaTopicData());
}
}
mpack_finish_array(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
SetValue(nullptr, topic->metaSub, Value::MakeRaw(std::move(w.bytes)));
}
}
void ServerStorage::PropertiesChanged(ServerClient* client, ServerTopic* topic,
const wpi::json& update) {
// removing some properties can result in the topic being unpublished
if (!topic->IsPublished()) {
DeleteTopic(topic);
} else {
// send updated announcement to all subscribers
for (auto&& tcd : topic->clients) {
tcd.first->SendPropertiesUpdate(topic, update, tcd.first == client);
}
}
}
static void DumpValue(wpi::raw_ostream& os, const Value& value,
wpi::json::serializer& s) {
switch (value.type()) {
case NT_BOOLEAN:
if (value.GetBoolean()) {
os << "true";
} else {
os << "false";
}
break;
case NT_DOUBLE:
s.dump_float(value.GetDouble());
break;
case NT_FLOAT:
s.dump_float(value.GetFloat());
break;
case NT_INTEGER:
s.dump_integer(value.GetInteger());
break;
case NT_STRING:
os << '"';
s.dump_escaped(value.GetString(), false);
os << '"';
break;
case NT_RAW:
case NT_RPC:
os << '"';
wpi::Base64Encode(os, value.GetRaw());
os << '"';
break;
case NT_BOOLEAN_ARRAY: {
os << '[';
bool first = true;
for (auto v : value.GetBooleanArray()) {
if (first) {
first = false;
} else {
os << ", ";
}
if (v) {
os << "true";
} else {
os << "false";
}
}
os << ']';
break;
}
case NT_DOUBLE_ARRAY: {
os << '[';
bool first = true;
for (auto v : value.GetDoubleArray()) {
if (first) {
first = false;
} else {
os << ", ";
}
s.dump_float(v);
}
os << ']';
break;
}
case NT_FLOAT_ARRAY: {
os << '[';
bool first = true;
for (auto v : value.GetFloatArray()) {
if (first) {
first = false;
} else {
os << ", ";
}
s.dump_float(v);
}
os << ']';
break;
}
case NT_INTEGER_ARRAY: {
os << '[';
bool first = true;
for (auto v : value.GetIntegerArray()) {
if (first) {
first = false;
} else {
os << ", ";
}
s.dump_integer(v);
}
os << ']';
break;
}
case NT_STRING_ARRAY: {
os << '[';
bool first = true;
for (auto&& v : value.GetStringArray()) {
if (first) {
first = false;
} else {
os << ", ";
}
os << '"';
s.dump_escaped(v, false);
os << '"';
}
os << ']';
break;
}
default:
os << "null";
break;
}
}
void ServerStorage::DumpPersistent(wpi::raw_ostream& os) {
wpi::json::serializer s{os, ' ', 16};
os << "[\n";
bool first = true;
for (const auto& topic : m_topics) {
if (!topic->persistent || !topic->lastValue) {
continue;
}
if (first) {
first = false;
} else {
os << ",\n";
}
os << " {\n \"name\": \"";
s.dump_escaped(topic->name, false);
os << "\",\n \"type\": \"";
s.dump_escaped(topic->typeStr, false);
os << "\",\n \"value\": ";
DumpValue(os, topic->lastValue, s);
os << ",\n \"properties\": ";
s.dump(topic->properties, true, false, 2, 4);
os << "\n }";
}
os << "\n]\n";
}
static std::string* ObjGetString(wpi::json::object_t& obj, std::string_view key,
std::string* error) {
auto it = obj.find(key);
if (it == obj.end()) {
*error = fmt::format("no {} key", key);
return nullptr;
}
auto val = it->second.get_ptr<std::string*>();
if (!val) {
*error = fmt::format("{} must be a string", key);
}
return val;
}
std::string ServerStorage::LoadPersistent(std::string_view in) {
if (in.empty()) {
return {};
}
wpi::json j;
try {
j = wpi::json::parse(in);
} catch (wpi::json::parse_error& err) {
return fmt::format("could not decode JSON: {}", err.what());
}
if (!j.is_array()) {
return "expected JSON array at top level";
}
bool persistentChanged = m_persistentChanged;
std::string allerrors;
int i = -1;
auto time = nt::Now();
for (auto&& jitem : j) {
++i;
std::string error;
{
auto obj = jitem.get_ptr<wpi::json::object_t*>();
if (!obj) {
error = "expected item to be an object";
goto err;
}
// name
auto name = ObjGetString(*obj, "name", &error);
if (!name) {
goto err;
}
// type
auto typeStr = ObjGetString(*obj, "type", &error);
if (!typeStr) {
goto err;
}
// properties
auto propsIt = obj->find("properties");
if (propsIt == obj->end()) {
error = "no properties key";
goto err;
}
auto& props = propsIt->second;
if (!props.is_object()) {
error = "properties must be an object";
goto err;
}
// check to make sure persistent property is set
auto persistentIt = props.find("persistent");
if (persistentIt == props.end()) {
error = "no persistent property";
goto err;
}
if (auto v = persistentIt->get_ptr<bool*>()) {
if (!*v) {
error = "persistent property is false";
goto err;
}
} else {
error = "persistent property is not boolean";
goto err;
}
// value
auto valueIt = obj->find("value");
if (valueIt == obj->end()) {
error = "no value key";
goto err;
}
Value value;
if (*typeStr == "boolean") {
if (auto v = valueIt->second.get_ptr<bool*>()) {
value = Value::MakeBoolean(*v, time);
} else {
error = "value type mismatch, expected boolean";
goto err;
}
} else if (*typeStr == "int") {
if (auto v = valueIt->second.get_ptr<int64_t*>()) {
value = Value::MakeInteger(*v, time);
} else if (auto v = valueIt->second.get_ptr<uint64_t*>()) {
value = Value::MakeInteger(*v, time);
} else {
error = "value type mismatch, expected int";
goto err;
}
} else if (*typeStr == "float") {
if (auto v = valueIt->second.get_ptr<double*>()) {
value = Value::MakeFloat(*v, time);
} else {
error = "value type mismatch, expected float";
goto err;
}
} else if (*typeStr == "double") {
if (auto v = valueIt->second.get_ptr<double*>()) {
value = Value::MakeDouble(*v, time);
} else {
error = "value type mismatch, expected double";
goto err;
}
} else if (*typeStr == "string" || *typeStr == "json") {
if (auto v = valueIt->second.get_ptr<std::string*>()) {
value = Value::MakeString(*v, time);
} else {
error = "value type mismatch, expected string";
goto err;
}
} else if (*typeStr == "boolean[]") {
auto arr = valueIt->second.get_ptr<wpi::json::array_t*>();
if (!arr) {
error = "value type mismatch, expected array";
goto err;
}
std::vector<int> elems;
for (auto&& jelem : valueIt->second) {
if (auto v = jelem.get_ptr<bool*>()) {
elems.push_back(*v);
} else {
error = "value type mismatch, expected boolean";
}
}
value = Value::MakeBooleanArray(elems, time);
} else if (*typeStr == "int[]") {
auto arr = valueIt->second.get_ptr<wpi::json::array_t*>();
if (!arr) {
error = "value type mismatch, expected array";
goto err;
}
std::vector<int64_t> elems;
for (auto&& jelem : valueIt->second) {
if (auto v = jelem.get_ptr<int64_t*>()) {
elems.push_back(*v);
} else if (auto v = jelem.get_ptr<uint64_t*>()) {
elems.push_back(*v);
} else {
error = "value type mismatch, expected int";
}
}
value = Value::MakeIntegerArray(elems, time);
} else if (*typeStr == "double[]") {
auto arr = valueIt->second.get_ptr<wpi::json::array_t*>();
if (!arr) {
error = "value type mismatch, expected array";
goto err;
}
std::vector<double> elems;
for (auto&& jelem : valueIt->second) {
if (auto v = jelem.get_ptr<double*>()) {
elems.push_back(*v);
} else {
error = "value type mismatch, expected double";
}
}
value = Value::MakeDoubleArray(elems, time);
} else if (*typeStr == "float[]") {
auto arr = valueIt->second.get_ptr<wpi::json::array_t*>();
if (!arr) {
error = "value type mismatch, expected array";
goto err;
}
std::vector<float> elems;
for (auto&& jelem : valueIt->second) {
if (auto v = jelem.get_ptr<double*>()) {
elems.push_back(*v);
} else {
error = "value type mismatch, expected float";
}
}
value = Value::MakeFloatArray(elems, time);
} else if (*typeStr == "string[]") {
auto arr = valueIt->second.get_ptr<wpi::json::array_t*>();
if (!arr) {
error = "value type mismatch, expected array";
goto err;
}
std::vector<std::string> elems;
for (auto&& jelem : valueIt->second) {
if (auto v = jelem.get_ptr<std::string*>()) {
elems.emplace_back(*v);
} else {
error = "value type mismatch, expected string";
}
}
value = Value::MakeStringArray(std::move(elems), time);
} else {
// raw
if (auto v = valueIt->second.get_ptr<std::string*>()) {
std::vector<uint8_t> data;
wpi::Base64Decode(*v, &data);
value = Value::MakeRaw(std::move(data), time);
} else {
error = "value type mismatch, expected string";
goto err;
}
}
// create persistent topic
auto topic = CreateTopic(nullptr, *name, *typeStr, props);
// set value
SetValue(nullptr, topic, value);
continue;
}
err:
allerrors += fmt::format("{}: {}\n", i, error);
}
m_persistentChanged = persistentChanged; // restore flag
return allerrors;
}

View File

@@ -0,0 +1,95 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <concepts>
#include <string>
#include <string_view>
#include <utility>
#include <wpi/StringMap.h>
#include <wpi/UidVector.h>
#include <wpi/json_fwd.h>
#include "server/ServerTopic.h"
namespace wpi {
class Logger;
class raw_ostream;
} // namespace wpi
namespace nt::server {
class ServerClient;
class ServerStorage final {
public:
ServerStorage(wpi::Logger& logger,
std::function<void(ServerTopic* topic, ServerClient* client)>
sendAnnounce)
: m_logger{logger}, m_sendAnnounce{std::move(sendAnnounce)} {}
ServerStorage(const ServerStorage&) = delete;
ServerStorage& operator=(const ServerStorage&) = delete;
ServerTopic* CreateTopic(ServerClient* client, std::string_view name,
std::string_view typeStr,
const wpi::json& properties, bool special = false);
ServerTopic* CreateMetaTopic(std::string_view name);
void DeleteTopic(ServerTopic* topic);
void SetProperties(ServerClient* client, ServerTopic* topic,
const wpi::json& update);
void SetFlags(ServerClient* client, ServerTopic* topic, unsigned int flags);
void SetValue(ServerClient* client, ServerTopic* topic, const Value& value);
void RemoveClient(ServerClient* client);
void PropertiesChanged(ServerClient* client, ServerTopic* topic,
const wpi::json& update);
ServerTopic* GetTopic(unsigned int id) const {
return id < m_topics.size() ? m_topics[id].get() : nullptr;
}
ServerTopic* GetTopic(std::string_view name) const {
auto it = m_nameTopics.find(name);
if (it == m_nameTopics.end()) {
return nullptr;
}
return it->second;
}
// Approximate upper bound, not exact quantity
size_t GetNumTopics() const { return m_topics.size(); }
void ForEachTopic(std::invocable<ServerTopic*> auto&& func) const {
for (auto&& topic : m_topics) {
func(topic.get());
}
}
// update meta topic values from data structures
void UpdateMetaTopicPub(ServerTopic* topic);
void UpdateMetaTopicSub(ServerTopic* topic);
// if any persistent values changed since the last call to this function
bool PersistentChanged() {
bool rv = m_persistentChanged;
m_persistentChanged = false;
return rv;
}
void DumpPersistent(wpi::raw_ostream& os);
// returns newline-separated errors
std::string LoadPersistent(std::string_view in);
private:
wpi::Logger& m_logger;
std::function<void(ServerTopic* topic, ServerClient* client)> m_sendAnnounce;
wpi::UidVector<std::unique_ptr<ServerTopic>, 16> m_topics;
wpi::StringMap<ServerTopic*> m_nameTopics;
bool m_persistentChanged{false};
};
} // namespace nt::server

View File

@@ -0,0 +1,88 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerSubscriber.h"
#include <utility>
#include <wpi/MessagePack.h>
#include <wpi/StringExtras.h>
#include "PubSubOptions.h"
#include "server/MessagePackWriter.h"
using namespace nt;
using namespace nt::server;
using namespace mpack;
static void WriteOptions(mpack_writer_t& w, const PubSubOptionsImpl& options) {
int size =
(options.sendAll ? 1 : 0) + (options.topicsOnly ? 1 : 0) +
(options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs ? 1 : 0) +
(options.prefixMatch ? 1 : 0);
mpack_start_map(&w, size);
if (options.sendAll) {
mpack_write_str(&w, "all");
mpack_write_bool(&w, true);
}
if (options.topicsOnly) {
mpack_write_str(&w, "topicsonly");
mpack_write_bool(&w, true);
}
if (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs) {
mpack_write_str(&w, "periodic");
mpack_write_float(&w, options.periodicMs / 1000.0);
}
if (options.prefixMatch) {
mpack_write_str(&w, "prefix");
mpack_write_bool(&w, true);
}
mpack_finish_map(&w);
}
bool ServerSubscriber::Matches(std::string_view name, bool special) {
for (auto&& topicName : m_topicNames) {
if ((!m_options.prefixMatch && name == topicName) ||
(m_options.prefixMatch && (!special || !topicName.empty()) &&
wpi::starts_with(name, topicName))) {
return true;
}
}
return false;
}
void ServerSubscriber::UpdateMeta() {
{
Writer w;
mpack_start_map(&w, 3);
mpack_write_str(&w, "uid");
mpack_write_int(&w, m_subuid);
mpack_write_str(&w, "topics");
mpack_start_array(&w, m_topicNames.size());
for (auto&& name : m_topicNames) {
mpack_write_str(&w, name);
}
mpack_finish_array(&w);
mpack_write_str(&w, "options");
WriteOptions(w, m_options);
mpack_finish_map(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
m_metaClient = std::move(w.bytes);
}
}
{
Writer w;
mpack_start_map(&w, 3);
mpack_write_str(&w, "client");
mpack_write_str(&w, m_clientName);
mpack_write_str(&w, "subuid");
mpack_write_int(&w, m_subuid);
mpack_write_str(&w, "options");
WriteOptions(w, m_options);
mpack_finish_map(&w);
if (mpack_writer_destroy(&w) == mpack_ok) {
m_metaTopic = std::move(w.bytes);
}
}
}

View File

@@ -0,0 +1,74 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <cmath>
#include <span>
#include <string>
#include <string_view>
#include <vector>
#include "PubSubOptions.h"
#include "server/Constants.h"
namespace nt::server {
class ServerClient;
struct ServerTopic;
class ServerSubscriber {
public:
ServerSubscriber(std::string_view clientName,
std::span<const std::string> topicNames, int64_t subuid,
const PubSubOptionsImpl& options)
: m_clientName{clientName},
m_topicNames{topicNames.begin(), topicNames.end()},
m_subuid{subuid},
m_options{options},
m_periodMs(std::lround(options.periodicMs / 10.0) * 10) {
UpdateMeta();
if (m_periodMs < kMinPeriodMs) {
m_periodMs = kMinPeriodMs;
}
}
ServerSubscriber(const ServerSubscriber&) = delete;
ServerSubscriber& operator=(const ServerSubscriber&) = delete;
void Update(std::span<const std::string> topicNames_,
const PubSubOptionsImpl& options_) {
m_topicNames = {topicNames_.begin(), topicNames_.end()};
m_options = options_;
UpdateMeta();
m_periodMs = std::lround(options_.periodicMs / 10.0) * 10;
if (m_periodMs < kMinPeriodMs) {
m_periodMs = kMinPeriodMs;
}
}
bool Matches(std::string_view name, bool special);
const PubSubOptions& GetOptions() const { return m_options; }
uint32_t GetPeriodMs() const { return m_periodMs; }
std::span<const uint8_t> GetMetaClientData() const { return m_metaClient; }
std::span<const uint8_t> GetMetaTopicData() const { return m_metaTopic; }
private:
void UpdateMeta();
std::string m_clientName;
std::vector<std::string> m_topicNames;
int64_t m_subuid;
PubSubOptionsImpl m_options;
std::vector<uint8_t> m_metaClient;
std::vector<uint8_t> m_metaTopic;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
uint32_t m_periodMs;
};
} // namespace nt::server

View File

@@ -0,0 +1,103 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ServerTopic.h"
#include "Log.h"
using namespace nt::server;
bool ServerTopic::SetProperties(const wpi::json& update) {
if (!update.is_object()) {
return false;
}
bool updated = false;
for (auto&& elem : update.items()) {
if (elem.value().is_null()) {
properties.erase(elem.key());
} else {
properties[elem.key()] = elem.value();
}
updated = true;
}
if (updated) {
RefreshProperties();
}
return updated;
}
void ServerTopic::RefreshProperties() {
persistent = false;
retained = false;
cached = true;
auto persistentIt = properties.find("persistent");
if (persistentIt != properties.end()) {
if (auto val = persistentIt->get_ptr<bool*>()) {
persistent = *val;
}
}
auto retainedIt = properties.find("retained");
if (retainedIt != properties.end()) {
if (auto val = retainedIt->get_ptr<bool*>()) {
retained = *val;
}
}
auto cachedIt = properties.find("cached");
if (cachedIt != properties.end()) {
if (auto val = cachedIt->get_ptr<bool*>()) {
cached = *val;
}
}
if (!cached) {
lastValue = {};
lastValueClient = nullptr;
}
if (!cached && persistent) {
WARN("topic {}: disabling cached property disables persistent storage",
name);
}
}
bool ServerTopic::SetFlags(unsigned int flags_) {
bool updated;
if ((flags_ & NT_PERSISTENT) != 0) {
updated = !persistent;
persistent = true;
properties["persistent"] = true;
} else {
updated = persistent;
persistent = false;
properties.erase("persistent");
}
if ((flags_ & NT_RETAINED) != 0) {
updated |= !retained;
retained = true;
properties["retained"] = true;
} else {
updated |= retained;
retained = false;
properties.erase("retained");
}
if ((flags_ & NT_UNCACHED) != 0) {
updated |= cached;
cached = false;
properties["cached"] = false;
lastValue = {};
lastValueClient = nullptr;
} else {
updated |= !cached;
cached = true;
properties.erase("cached");
}
if (!cached && persistent) {
WARN("topic {}: disabling cached property disables persistent storage",
name);
}
return updated;
}

View File

@@ -0,0 +1,104 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <string>
#include <string_view>
#include <utility>
#include <wpi/DenseMap.h>
#include <wpi/SmallPtrSet.h>
#include <wpi/json.h>
#include "net/NetworkOutgoingQueue.h"
#include "networktables/NetworkTableValue.h"
#include "server/ServerPublisher.h"
#include "server/ServerSubscriber.h"
namespace wpi {
class Logger;
} // namespace wpi
namespace nt::server {
class ServerClient;
struct TopicClientData {
wpi::SmallPtrSet<ServerPublisher*, 2> publishers;
wpi::SmallPtrSet<ServerSubscriber*, 2> subscribers;
net::ValueSendMode sendMode = net::ValueSendMode::kDisabled;
bool AddSubscriber(ServerSubscriber* sub) {
bool added = subscribers.insert(sub).second;
auto& options = sub->GetOptions();
if (!options.topicsOnly) {
if (options.sendAll) {
sendMode = net::ValueSendMode::kAll;
} else if (sendMode == net::ValueSendMode::kDisabled) {
sendMode = net::ValueSendMode::kNormal;
}
}
return added;
}
};
struct ServerTopic {
ServerTopic(wpi::Logger& logger, std::string_view name,
std::string_view typeStr)
: m_logger{logger}, name{name}, typeStr{typeStr} {}
ServerTopic(wpi::Logger& logger, std::string_view name,
std::string_view typeStr, wpi::json properties)
: m_logger{logger},
name{name},
typeStr{typeStr},
properties(std::move(properties)) {
RefreshProperties();
}
ServerTopic(const ServerTopic&) = delete;
ServerTopic& operator=(const ServerTopic&) = delete;
bool IsPublished() const {
return persistent || retained || publisherCount != 0;
}
// returns true if properties changed
bool SetProperties(const wpi::json& update);
void RefreshProperties();
bool SetFlags(unsigned int flags_);
wpi::Logger& m_logger; // Must be m_logger for WARN macro to work
std::string name;
unsigned int id;
Value lastValue;
ServerClient* lastValueClient = nullptr;
std::string typeStr;
wpi::json properties = wpi::json::object();
unsigned int publisherCount{0};
bool persistent{false};
bool retained{false};
bool cached{true};
bool special{false};
int localTopic{0};
void AddPublisher(ServerClient* client, ServerPublisher* pub) {
if (clients[client].publishers.insert(pub).second) {
++publisherCount;
}
}
void RemovePublisher(ServerClient* client, ServerPublisher* pub) {
if (clients[client].publishers.erase(pub)) {
--publisherCount;
}
}
wpi::SmallDenseMap<ServerClient*, TopicClientData, 4> clients;
// meta topics
ServerTopic* metaPub = nullptr;
ServerTopic* metaSub = nullptr;
};
} // namespace nt::server

View File

@@ -24,6 +24,12 @@ inline void mpack_write_bytes(mpack_writer_t* writer,
data.size());
}
inline void mpack_write_object_bytes(mpack_writer_t* writer,
std::span<const uint8_t> data) {
mpack_write_object_bytes(writer, reinterpret_cast<const char*>(data.data()),
data.size());
}
inline void mpack_reader_init_data(mpack_reader_t* reader,
std::span<const uint8_t> data) {
mpack_reader_init_data(reader, reinterpret_cast<const char*>(data.data()),