diff --git a/ntcore/src/main/native/cpp/server/Constants.h b/ntcore/src/main/native/cpp/server/Constants.h new file mode 100644 index 0000000000..c04a7e0899 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/Constants.h @@ -0,0 +1,13 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +namespace nt::server { + +inline constexpr uint32_t kMinPeriodMs = 5; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/Functions.h b/ntcore/src/main/native/cpp/server/Functions.h new file mode 100644 index 0000000000..a1b4ac65db --- /dev/null +++ b/ntcore/src/main/native/cpp/server/Functions.h @@ -0,0 +1,16 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +namespace nt::server { + +using SetPeriodicFunc = std::function; +using Connected3Func = + std::function; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/MessagePackWriter.h b/ntcore/src/main/native/cpp/server/MessagePackWriter.h new file mode 100644 index 0000000000..5ff99bc9c6 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/MessagePackWriter.h @@ -0,0 +1,31 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include + +#include +#include + +namespace nt::server { + +struct Writer : public mpack::mpack_writer_t { + Writer() { + mpack::mpack_writer_init(this, buf, sizeof(buf)); + mpack::mpack_writer_set_context(this, &os); + mpack::mpack_writer_set_flush( + this, [](mpack::mpack_writer_t* w, const char* buffer, size_t count) { + static_cast(w->context)->write(buffer, count); + }); + } + + std::vector bytes; + wpi::raw_uvector_ostream os{bytes}; + char buf[128]; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClient.cpp b/ntcore/src/main/native/cpp/server/ServerClient.cpp new file mode 100644 index 0000000000..d192668fe9 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient.cpp @@ -0,0 +1,59 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClient.h" + +#include + +#include + +#include "server/MessagePackWriter.h" +#include "server/ServerPublisher.h" +#include "server/ServerStorage.h" + +using namespace nt::server; +using namespace mpack; + +void ServerClient::UpdateMetaClientPub() { + if (!m_metaPub) { + return; + } + Writer w; + mpack_start_array(&w, m_publishers.size()); + for (auto&& pub : m_publishers) { + mpack_write_object_bytes(&w, pub.second->GetMetaClientData()); + } + mpack_finish_array(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + m_storage.SetValue(nullptr, m_metaPub, Value::MakeRaw(std::move(w.bytes))); + } +} + +void ServerClient::UpdateMetaClientSub() { + if (!m_metaSub) { + return; + } + Writer w; + mpack_start_array(&w, m_subscribers.size()); + for (auto&& sub : m_subscribers) { + mpack_write_object_bytes(&w, sub.second->GetMetaClientData()); + } + mpack_finish_array(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + m_storage.SetValue(nullptr, m_metaSub, Value::MakeRaw(std::move(w.bytes))); + } +} + +std::span ServerClient::GetSubscribers( + std::string_view name, bool special, + wpi::SmallVectorImpl& buf) { + buf.resize(0); + for (auto&& subPair : m_subscribers) { + ServerSubscriber* subscriber = subPair.getSecond().get(); + if (subscriber->Matches(name, special)) { + buf.emplace_back(subscriber); + } + } + return {buf.data(), buf.size()}; +} diff --git a/ntcore/src/main/native/cpp/server/ServerClient.h b/ntcore/src/main/native/cpp/server/ServerClient.h new file mode 100644 index 0000000000..d56753b75b --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient.h @@ -0,0 +1,100 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include "net/NetworkOutgoingQueue.h" +#include "server/Functions.h" +#include "server/ServerPublisher.h" +#include "server/ServerSubscriber.h" + +namespace wpi { +class Logger; +template +class SmallVectorImpl; +} // namespace wpi + +namespace nt::server { + +struct ServerTopic; +class ServerStorage; +struct TopicClientData; + +class ServerClient { + public: + ServerClient(std::string_view name, std::string_view connInfo, bool local, + SetPeriodicFunc setPeriodic, ServerStorage& storage, int id, + wpi::Logger& logger) + : m_name{name}, + m_connInfo{connInfo}, + m_local{local}, + m_setPeriodic{std::move(setPeriodic)}, + m_storage{storage}, + m_id{id}, + m_logger{logger} {} + ServerClient(const ServerClient&) = delete; + ServerClient& operator=(const ServerClient&) = delete; + virtual ~ServerClient() = default; + + // these return true if any messages have been queued for later processing + virtual bool ProcessIncomingText(std::string_view data) = 0; + virtual bool ProcessIncomingBinary(std::span data) = 0; + + virtual void SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) = 0; + virtual void SendAnnounce(ServerTopic* topic, std::optional pubuid) = 0; + virtual void SendUnannounce(ServerTopic* topic) = 0; + virtual void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update, + bool ack) = 0; + virtual void SendOutgoing(uint64_t curTimeMs, bool flush) = 0; + virtual void Flush() = 0; + + // later processing -- returns true if more to process + virtual bool ProcessIncomingMessages(size_t max) = 0; + + void UpdateMetaClientPub(); + void UpdateMetaClientSub(); + + std::span GetSubscribers( + std::string_view name, bool special, + wpi::SmallVectorImpl& buf); + + std::string_view GetName() const { return m_name; } + int GetId() const { return m_id; } + + virtual void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) {} + + protected: + std::string m_name; + std::string m_connInfo; + bool m_local; // local to machine + SetPeriodicFunc m_setPeriodic; + // TODO: make this per-topic? + uint32_t m_periodMs{UINT32_MAX}; + ServerStorage& m_storage; + int m_id; + + wpi::Logger& m_logger; + + wpi::DenseMap> m_publishers; + wpi::DenseMap> m_subscribers; + + public: + // meta topics + ServerTopic* m_metaPub = nullptr; + ServerTopic* m_metaSub = nullptr; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClient3.cpp b/ntcore/src/main/native/cpp/server/ServerClient3.cpp new file mode 100644 index 0000000000..0841b46618 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient3.cpp @@ -0,0 +1,482 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClient3.h" + +#include +#include + +#include + +#include "Log.h" +#include "Types_internal.h" +#include "net3/WireEncoder3.h" +#include "server/ServerImpl.h" +#include "server/ServerPublisher.h" +#include "server/ServerTopic.h" + +using namespace nt::server; + +// maximum amount of time the wire can be not ready to send another +// transmission before we close the connection +static constexpr uint32_t kWireMaxNotReadyUs = 1000000; + +bool ServerClient3::TopicData3::UpdateFlags(ServerTopic* topic) { + unsigned int newFlags = topic->persistent ? NT_PERSISTENT : 0; + bool updated = flags != newFlags; + flags = newFlags; + return updated; +} + +bool ServerClient3::ProcessIncomingBinary(std::span data) { + if (!m_decoder.Execute(&data)) { + m_wire.Disconnect(m_decoder.GetError()); + } + return false; +} + +void ServerClient3::SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) { + if (m_state != kStateRunning) { + if (mode == net::ValueSendMode::kImm) { + mode = net::ValueSendMode::kAll; + } + } else if (m_local) { + mode = net::ValueSendMode::kImm; // always send local immediately + } + TopicData3* topic3 = GetTopic3(topic); + bool added = false; + + switch (mode) { + case net::ValueSendMode::kDisabled: // do nothing + break; + case net::ValueSendMode::kImm: // send immediately + ++topic3->seqNum; + if (topic3->sentAssign) { + net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id, + topic3->seqNum.value(), value); + } else { + net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, + topic->id, topic3->seqNum.value(), value, + topic3->flags); + topic3->sentAssign = true; + } + if (m_local) { + Flush(); + } + break; + case net::ValueSendMode::kNormal: { + // replace, or append if not present + wpi::DenseMap::iterator it; + std::tie(it, added) = + m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size()); + if (!added && it->second < m_outgoing.size()) { + auto& msg = m_outgoing[it->second]; + if (msg.Is(net3::Message3::kEntryUpdate) || + msg.Is(net3::Message3::kEntryAssign)) { + if (msg.id() == topic->id) { // should always be true + msg.SetValue(value); + break; + } + } + } + } + // fallthrough + case net::ValueSendMode::kAll: // append to outgoing + if (!added) { + m_outgoingValueMap[topic->id] = m_outgoing.size(); + } + ++topic3->seqNum; + if (topic3->sentAssign) { + m_outgoing.emplace_back(net3::Message3::EntryUpdate( + topic->id, topic3->seqNum.value(), value)); + } else { + m_outgoing.emplace_back(net3::Message3::EntryAssign( + topic->name, topic->id, topic3->seqNum.value(), value, + topic3->flags)); + topic3->sentAssign = true; + } + break; + } +} + +void ServerClient3::SendAnnounce(ServerTopic* topic, + std::optional pubuid) { + // ignore if we've not yet built the subscriber + if (m_subscribers.empty()) { + return; + } + + // subscribe to all non-special topics + if (!topic->special) { + topic->clients[this].AddSubscriber(m_subscribers[0].get()); + m_storage.UpdateMetaTopicSub(topic); + } + + // NT3 requires a value to send the assign message, so the assign message + // will get sent when the first value is sent (by SendValue). +} + +void ServerClient3::SendUnannounce(ServerTopic* topic) { + auto it = m_topics3.find(topic); + if (it == m_topics3.end()) { + return; // never sent to client + } + bool sentAssign = it->second.sentAssign; + m_topics3.erase(it); + if (!sentAssign) { + return; // never sent to client + } + + // map to NT3 delete message + if (m_local && m_state == kStateRunning) { + net3::WireEncodeEntryDelete(m_wire.Send().stream(), topic->id); + Flush(); + } else { + m_outgoing.emplace_back(net3::Message3::EntryDelete(topic->id)); + } +} + +void ServerClient3::SendPropertiesUpdate(ServerTopic* topic, + const wpi::json& update, bool ack) { + if (ack) { + return; // we don't ack in NT3 + } + auto it = m_topics3.find(topic); + if (it == m_topics3.end()) { + return; // never sent to client + } + TopicData3* topic3 = &it->second; + // Don't send flags update unless we've already sent an assign message. + // The assign message will contain the updated flags when we eventually + // send it. + if (topic3->UpdateFlags(topic) && topic3->sentAssign) { + if (m_local && m_state == kStateRunning) { + net3::WireEncodeFlagsUpdate(m_wire.Send().stream(), topic->id, + topic3->flags); + Flush(); + } else { + m_outgoing.emplace_back( + net3::Message3::FlagsUpdate(topic->id, topic3->flags)); + } + } +} + +void ServerClient3::SendOutgoing(uint64_t curTimeMs, bool flush) { + if (m_outgoing.empty() || m_state != kStateRunning) { + return; // nothing to do + } + + // rate limit frequency of transmissions + if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) { + return; + } + + if (!m_wire.Ready()) { + uint64_t lastFlushTime = m_wire.GetLastFlushTime(); + uint64_t now = wpi::Now(); + if (lastFlushTime != 0 && now > (lastFlushTime + kWireMaxNotReadyUs)) { + m_wire.Disconnect("transmit stalled"); + } + return; + } + + auto out = m_wire.Send(); + for (auto&& msg : m_outgoing) { + net3::WireEncode(out.stream(), msg); + } + m_wire.Flush(); + m_outgoing.resize(0); + m_outgoingValueMap.clear(); + m_lastSendMs = curTimeMs; +} + +void ServerClient3::KeepAlive() { + DEBUG4("KeepAlive({})", m_id); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected KeepAlive message"); + return; + } + // ignore +} + +void ServerClient3::ServerHelloDone() { + DEBUG4("ServerHelloDone({})", m_id); + m_decoder.SetError("received unexpected ServerHelloDone message"); +} + +void ServerClient3::ClientHelloDone() { + DEBUG4("ClientHelloDone({})", m_id); + if (m_state != kStateServerHelloComplete) { + m_decoder.SetError("received unexpected ClientHelloDone message"); + return; + } + m_state = kStateRunning; +} + +void ServerClient3::ClearEntries() { + DEBUG4("ClearEntries({})", m_id); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected ClearEntries message"); + return; + } + + for (auto topic3it : m_topics3) { + ServerTopic* topic = topic3it.first; + + // make sure we send assign the next time + topic3it.second.sentAssign = false; + + // unpublish from this client (if it was previously published) + if (topic3it.second.published) { + topic3it.second.published = false; + auto publisherIt = m_publishers.find(topic3it.second.pubuid); + if (publisherIt != m_publishers.end()) { + // remove publisher from topic + topic->RemovePublisher(this, publisherIt->second.get()); + + // remove publisher from client + m_publishers.erase(publisherIt); + + // update meta data + m_storage.UpdateMetaTopicPub(topic); + UpdateMetaClientPub(); + } + } + + // set retained=false + m_storage.SetProperties(this, topic, {{"retained", false}}); + } +} + +void ServerClient3::ProtoUnsup(unsigned int proto_rev) { + DEBUG4("ProtoUnsup({})", m_id); + m_decoder.SetError("received unexpected ProtoUnsup message"); +} + +void ServerClient3::ClientHello(std::string_view self_id, + unsigned int proto_rev) { + DEBUG4("ClientHello({}, '{}', {:04x})", m_id, self_id, proto_rev); + if (m_state != kStateInitial) { + m_decoder.SetError("received unexpected ClientHello message"); + return; + } + if (proto_rev != 0x0300) { + net3::WireEncodeProtoUnsup(m_wire.Send().stream(), 0x0300); + Flush(); + m_decoder.SetError( + fmt::format("unsupported protocol version {:04x}", proto_rev)); + return; + } + // create a unique name including client id + m_name = fmt::format("{}-NT3@{}", self_id, m_connInfo); + m_connected(m_name, 0x0300); + m_connected = nullptr; // no longer required + + // create client meta topics + m_metaPub = m_storage.CreateMetaTopic(fmt::format("$clientpub${}", m_name)); + m_metaSub = m_storage.CreateMetaTopic(fmt::format("$clientsub${}", m_name)); + + // subscribe and send initial assignments + auto& sub = m_subscribers[0]; + std::string prefix; + PubSubOptions options; + options.prefixMatch = true; + sub = std::make_unique( + GetName(), std::span{{prefix}}, 0, options); + m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->GetPeriodMs()); + m_setPeriodic(m_periodMs); + + { + auto out = m_wire.Send(); + net3::WireEncodeServerHello(out.stream(), 0, "server"); + m_storage.ForEachTopic([&](ServerTopic* topic) { + if (topic && !topic->special && topic->IsPublished() && + topic->lastValue) { + DEBUG4("client {}: initial announce of '{}' (id {})", m_id, topic->name, + topic->id); + topic->clients[this].AddSubscriber(sub.get()); + m_storage.UpdateMetaTopicSub(topic); + + TopicData3* topic3 = GetTopic3(topic); + ++topic3->seqNum; + net3::WireEncodeEntryAssign(out.stream(), topic->name, topic->id, + topic3->seqNum.value(), topic->lastValue, + topic3->flags); + topic3->sentAssign = true; + } + }); + net3::WireEncodeServerHelloDone(out.stream()); + } + Flush(); + m_state = kStateServerHelloComplete; + + // update meta topics + UpdateMetaClientPub(); + UpdateMetaClientSub(); +} + +void ServerClient3::ServerHello(unsigned int flags, std::string_view self_id) { + DEBUG4("ServerHello({}, {}, {})", m_id, flags, self_id); + m_decoder.SetError("received unexpected ServerHello message"); +} + +void ServerClient3::EntryAssign(std::string_view name, unsigned int id, + unsigned int seq_num, const Value& value, + unsigned int flags) { + DEBUG4("EntryAssign({}, {}, {}, {}, {})", m_id, id, seq_num, + static_cast(value.type()), flags); + if (id != 0xffff) { + DEBUG3("ignored EntryAssign from {} with non-0xffff id {}", m_id, id); + return; + } + + // convert from NT3 info + auto typeStr = TypeToString(value.type()); + wpi::json properties = wpi::json::object(); + properties["retained"] = true; // treat all NT3 published topics as retained + properties["cached"] = true; // treat all NT3 published topics as cached + if ((flags & NT_PERSISTENT) != 0) { + properties["persistent"] = true; + } + + // create topic + auto topic = m_storage.CreateTopic(this, name, typeStr, properties); + TopicData3* topic3 = GetTopic3(topic); + if (topic3->published || topic3->sentAssign) { + WARN("ignoring client {} duplicate publish of '{}'", m_id, name); + return; + } + ++topic3->seqNum; + topic3->published = true; + topic3->pubuid = m_nextPubUid++; + topic3->sentAssign = true; + + // create publisher + auto [publisherIt, isNew] = m_publishers.try_emplace( + topic3->pubuid, + std::make_unique(GetName(), topic, topic3->pubuid)); + if (!isNew) { + return; // shouldn't happen, but just in case... + } + + // add publisher to topic + topic->AddPublisher(this, publisherIt->getSecond().get()); + + // update meta data + m_storage.UpdateMetaTopicPub(topic); + UpdateMetaClientPub(); + + // acts as an announce + data update + SendAnnounce(topic, topic3->pubuid); + m_storage.SetValue(this, topic, value); + + // respond with assign message with assigned topic ID + if (m_local && m_state == kStateRunning) { + net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, topic->id, + topic3->seqNum.value(), value, topic3->flags); + } else { + m_outgoing.emplace_back(net3::Message3::EntryAssign( + topic->name, topic->id, topic3->seqNum.value(), value, topic3->flags)); + } +} + +void ServerClient3::EntryUpdate(unsigned int id, unsigned int seq_num, + const Value& value) { + DEBUG4("EntryUpdate({}, {}, {}, {})", m_id, id, seq_num, + static_cast(value.type())); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected EntryUpdate message"); + return; + } + + ServerTopic* topic = m_storage.GetTopic(id); + if (!topic || !topic->IsPublished()) { + DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id); + return; + } + + TopicData3* topic3 = GetTopic3(topic); + if (!topic3->published) { + topic3->published = true; + topic3->pubuid = m_nextPubUid++; + + // create publisher + auto [publisherIt, isNew] = m_publishers.try_emplace( + topic3->pubuid, + std::make_unique(GetName(), topic, topic3->pubuid)); + if (isNew) { + // add publisher to topic + topic->AddPublisher(this, publisherIt->getSecond().get()); + + // update meta data + m_storage.UpdateMetaTopicPub(topic); + UpdateMetaClientPub(); + } + } + topic3->seqNum = net3::SequenceNumber{seq_num}; + + m_storage.SetValue(this, topic, value); +} + +void ServerClient3::FlagsUpdate(unsigned int id, unsigned int flags) { + DEBUG4("FlagsUpdate({}, {}, {})", m_id, id, flags); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected FlagsUpdate message"); + return; + } + ServerTopic* topic = m_storage.GetTopic(id); + if (!topic || !topic->IsPublished()) { + DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id); + return; + } + if (topic->special) { + DEBUG3("ignored FlagsUpdate from {} on special topic {}", m_id, id); + return; + } + m_storage.SetFlags(this, topic, flags); +} + +void ServerClient3::EntryDelete(unsigned int id) { + DEBUG4("EntryDelete({}, {})", m_id, id); + if (m_state != kStateRunning) { + m_decoder.SetError("received unexpected EntryDelete message"); + return; + } + ServerTopic* topic = m_storage.GetTopic(id); + if (!topic || !topic->IsPublished()) { + DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id); + return; + } + if (topic->special) { + DEBUG3("ignored EntryDelete from {} on special topic {}", m_id, id); + return; + } + + auto topic3it = m_topics3.find(topic); + if (topic3it != m_topics3.end()) { + // make sure we send assign the next time + topic3it->second.sentAssign = false; + + // unpublish from this client (if it was previously published) + if (topic3it->second.published) { + topic3it->second.published = false; + auto publisherIt = m_publishers.find(topic3it->second.pubuid); + if (publisherIt != m_publishers.end()) { + // remove publisher from topic + topic->RemovePublisher(this, publisherIt->second.get()); + + // remove publisher from client + m_publishers.erase(publisherIt); + + // update meta data + m_storage.UpdateMetaTopicPub(topic); + UpdateMetaClientPub(); + } + } + } + + // set retained=false + m_storage.SetProperties(this, topic, {{"retained", false}}); +} diff --git a/ntcore/src/main/native/cpp/server/ServerClient3.h b/ntcore/src/main/native/cpp/server/ServerClient3.h new file mode 100644 index 0000000000..190ba6e00f --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient3.h @@ -0,0 +1,97 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +#include "ServerClient.h" +#include "net/ClientMessageQueue.h" +#include "net3/Message3.h" +#include "net3/SequenceNumber.h" +#include "net3/WireConnection3.h" +#include "net3/WireDecoder3.h" +#include "server/Functions.h" + +namespace nt::server { + +class ServerClient3 final : public ServerClient, private net3::MessageHandler3 { + public: + ServerClient3(std::string_view connInfo, bool local, + net3::WireConnection3& wire, Connected3Func connected, + SetPeriodicFunc setPeriodic, ServerStorage& storage, int id, + wpi::Logger& logger) + : ServerClient{"", connInfo, local, setPeriodic, storage, id, logger}, + m_connected{std::move(connected)}, + m_wire{wire}, + m_decoder{*this}, + m_incoming{logger} {} + + bool ProcessIncomingText(std::string_view data) final { return false; } + bool ProcessIncomingBinary(std::span data) final; + + bool ProcessIncomingMessages(size_t max) final { return false; } + + void SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) final; + void SendAnnounce(ServerTopic* topic, std::optional pubuid) final; + void SendUnannounce(ServerTopic* topic) final; + void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update, + bool ack) final; + void SendOutgoing(uint64_t curTimeMs, bool flush) final; + + void Flush() final { m_wire.Flush(); } + + private: + // MessageHandler3 interface + void KeepAlive() final; + void ServerHelloDone() final; + void ClientHelloDone() final; + void ClearEntries() final; + void ProtoUnsup(unsigned int proto_rev) final; + void ClientHello(std::string_view self_id, unsigned int proto_rev) final; + void ServerHello(unsigned int flags, std::string_view self_id) final; + void EntryAssign(std::string_view name, unsigned int id, unsigned int seq_num, + const Value& value, unsigned int flags) final; + void EntryUpdate(unsigned int id, unsigned int seq_num, + const Value& value) final; + void FlagsUpdate(unsigned int id, unsigned int flags) final; + void EntryDelete(unsigned int id) final; + void ExecuteRpc(unsigned int id, unsigned int uid, + std::span params) final {} + void RpcResponse(unsigned int id, unsigned int uid, + std::span result) final {} + + Connected3Func m_connected; + net3::WireConnection3& m_wire; + + enum State { kStateInitial, kStateServerHelloComplete, kStateRunning }; + State m_state{kStateInitial}; + net3::WireDecoder3 m_decoder; + + net::NetworkIncomingClientQueue m_incoming; + std::vector m_outgoing; + wpi::DenseMap m_outgoingValueMap; + int64_t m_nextPubUid{1}; + uint64_t m_lastSendMs{0}; + + struct TopicData3 { + explicit TopicData3(ServerTopic* topic) { UpdateFlags(topic); } + + unsigned int flags{0}; + net3::SequenceNumber seqNum; + bool sentAssign{false}; + bool published{false}; + int64_t pubuid{0}; + + bool UpdateFlags(ServerTopic* topic); + }; + wpi::DenseMap m_topics3; + TopicData3* GetTopic3(ServerTopic* topic) { + return &m_topics3.try_emplace(topic, topic).first->second; + } +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClient4.cpp b/ntcore/src/main/native/cpp/server/ServerClient4.cpp new file mode 100644 index 0000000000..9b6299fded --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient4.cpp @@ -0,0 +1,180 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClient4.h" + +#include + +#include + +#include "Log.h" +#include "net/WireDecoder.h" +#include "server/ServerStorage.h" +#include "server/ServerTopic.h" + +using namespace nt::server; + +ServerClient4::ServerClient4(std::string_view name, std::string_view connInfo, + bool local, net::WireConnection& wire, + SetPeriodicFunc setPeriodic, + ServerStorage& storage, int id, + wpi::Logger& logger) + : ServerClient4Base{name, connInfo, local, setPeriodic, + storage, id, logger}, + m_wire{wire}, + m_ping{wire}, + m_incoming{logger}, + m_outgoing{wire, local} { + // create client meta topics + m_metaPub = storage.CreateMetaTopic(fmt::format("$clientpub${}", name)); + m_metaSub = storage.CreateMetaTopic(fmt::format("$clientsub${}", name)); + + // update meta topics + UpdateMetaClientPub(); + UpdateMetaClientSub(); +} + +bool ServerClient4::ProcessIncomingText(std::string_view data) { + constexpr int kMaxImmProcessing = 10; + bool queueWasEmpty = m_incoming.empty(); + // can't directly process, because we don't know how big it is + WireDecodeText(data, m_incoming, m_logger); + if (queueWasEmpty && + DoProcessIncomingMessages(m_incoming, kMaxImmProcessing)) { + m_wire.StopRead(); + return true; + } + return false; +} + +bool ServerClient4::ProcessIncomingBinary(std::span data) { + constexpr int kMaxImmProcessing = 10; + // if we've already queued, keep queuing + int count = m_incoming.empty() ? 0 : kMaxImmProcessing; + for (;;) { + if (data.empty()) { + break; + } + + // decode message + int pubuid; + Value value; + std::string error; + if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) { + m_wire.Disconnect(fmt::format("binary decode error: {}", error)); + break; + } + + // respond to RTT ping + if (pubuid == -1) { + auto now = wpi::Now(); + DEBUG4("RTT ping from {}, responding with time={}", m_id, now); + m_wire.SendBinary( + [&](auto& os) { net::WireEncodeBinary(os, -1, now, value); }); + continue; + } + + // handle value set + if (++count < kMaxImmProcessing) { + ClientSetValue(pubuid, value); + } else { + m_incoming.ClientSetValue(pubuid, value); + } + } + if (count >= kMaxImmProcessing) { + m_wire.StopRead(); + return true; + } + return false; +} + +void ServerClient4::SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) { + m_outgoing.SendValue(topic->id, value, mode); +} + +void ServerClient4::SendAnnounce(ServerTopic* topic, + std::optional pubuid) { + auto& sent = m_announceSent[topic]; + if (sent) { + return; + } + sent = true; + + if (m_local) { + int unsent = m_wire.WriteText([&](auto& os) { + net::WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr, + topic->properties, pubuid); + }); + if (unsent < 0) { + return; // error + } + if (unsent == 0 && m_wire.Flush() == 0) { + return; + } + } + m_outgoing.SendMessage( + topic->id, net::AnnounceMsg{topic->name, static_cast(topic->id), + topic->typeStr, pubuid, topic->properties}); +} + +void ServerClient4::SendUnannounce(ServerTopic* topic) { + auto& sent = m_announceSent[topic]; + if (!sent) { + return; + } + sent = false; + + if (m_local) { + int unsent = m_wire.WriteText([&](auto& os) { + net::WireEncodeUnannounce(os, topic->name, topic->id); + }); + if (unsent < 0) { + return; // error + } + if (unsent == 0 && m_wire.Flush() == 0) { + return; + } + } + m_outgoing.SendMessage( + topic->id, net::UnannounceMsg{topic->name, static_cast(topic->id)}); + m_outgoing.EraseId(topic->id); +} + +void ServerClient4::SendPropertiesUpdate(ServerTopic* topic, + const wpi::json& update, bool ack) { + if (!m_announceSent.lookup(topic)) { + return; + } + + if (m_local) { + int unsent = m_wire.WriteText([&](auto& os) { + net::WireEncodePropertiesUpdate(os, topic->name, update, ack); + }); + if (unsent < 0) { + return; // error + } + if (unsent == 0 && m_wire.Flush() == 0) { + return; + } + } + m_outgoing.SendMessage(topic->id, + net::PropertiesUpdateMsg{topic->name, update, ack}); +} + +void ServerClient4::SendOutgoing(uint64_t curTimeMs, bool flush) { + if (m_wire.GetVersion() >= 0x0401) { + if (!m_ping.Send(curTimeMs)) { + return; + } + } + m_outgoing.SendOutgoing(curTimeMs, flush); +} + +void ServerClient4::UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) { + uint32_t period = net::CalculatePeriod( + tcd.subscribers, [](auto& x) { return x->GetPeriodMs(); }); + DEBUG4("updating {} period to {} ms", topic->name, period); + m_outgoing.SetPeriod(topic->id, period); +} diff --git a/ntcore/src/main/native/cpp/server/ServerClient4.h b/ntcore/src/main/native/cpp/server/ServerClient4.h new file mode 100644 index 0000000000..7c58addf01 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient4.h @@ -0,0 +1,54 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include "net/NetworkPing.h" +#include "net/WireConnection.h" +#include "server/Functions.h" +#include "server/ServerClient4Base.h" + +namespace nt::server { + +class ServerClient4 final : public ServerClient4Base { + public: + ServerClient4(std::string_view name, std::string_view connInfo, bool local, + net::WireConnection& wire, SetPeriodicFunc setPeriodic, + ServerStorage& storage, int id, wpi::Logger& logger); + + bool ProcessIncomingText(std::string_view data) final; + bool ProcessIncomingBinary(std::span data) final; + + bool ProcessIncomingMessages(size_t max) final { + if (!DoProcessIncomingMessages(m_incoming, max)) { + m_wire.StartRead(); + return false; + } + return true; + } + + void SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) final; + void SendAnnounce(ServerTopic* topic, std::optional pubuid) final; + void SendUnannounce(ServerTopic* topic) final; + void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update, + bool ack) final; + void SendOutgoing(uint64_t curTimeMs, bool flush) final; + + void Flush() final {} + + void UpdatePeriod(TopicClientData& tcd, ServerTopic* topic) final; + + public: + net::WireConnection& m_wire; + + private: + net::NetworkPing m_ping; + net::NetworkIncomingClientQueue m_incoming; + net::NetworkOutgoingQueue m_outgoing; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClient4Base.cpp b/ntcore/src/main/native/cpp/server/ServerClient4Base.cpp new file mode 100644 index 0000000000..dd7ab56913 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient4Base.cpp @@ -0,0 +1,242 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClient4Base.h" + +#include +#include +#include + +#include +#include + +#include "Log.h" +#include "server/ServerImpl.h" +#include "server/ServerPublisher.h" + +using namespace nt::server; + +void ServerClient4Base::ClientPublish(int pubuid, std::string_view name, + std::string_view typeStr, + const wpi::json& properties, + const PubSubOptionsImpl& options) { + DEBUG3("ClientPublish({}, {}, {}, {})", m_id, name, pubuid, typeStr); + auto topic = m_storage.CreateTopic(this, name, typeStr, properties); + + // create publisher + auto [publisherIt, isNew] = m_publishers.try_emplace( + pubuid, std::make_unique(GetName(), topic, pubuid)); + if (!isNew) { + WARN("client {} duplicate publish of pubuid {}", m_id, pubuid); + } else { + // add publisher to topic + topic->AddPublisher(this, publisherIt->getSecond().get()); + + // update meta data + m_storage.UpdateMetaTopicPub(topic); + } + + // respond with announce with pubuid to client + DEBUG4("client {}: announce {} pubuid {}", m_id, topic->name, pubuid); + SendAnnounce(topic, pubuid); +} + +void ServerClient4Base::ClientUnpublish(int pubuid) { + DEBUG3("ClientUnpublish({}, {})", m_id, pubuid); + auto publisherIt = m_publishers.find(pubuid); + if (publisherIt == m_publishers.end()) { + return; // nothing to do + } + auto publisher = publisherIt->getSecond().get(); + auto topic = publisher->GetTopic(); + + // remove publisher from topic + topic->RemovePublisher(this, publisher); + + // remove publisher from client + m_publishers.erase(publisherIt); + + // update meta data + m_storage.UpdateMetaTopicPub(topic); + + // delete topic if no longer published + if (!topic->IsPublished()) { + m_storage.DeleteTopic(topic); + } +} + +void ServerClient4Base::ClientSetProperties(std::string_view name, + const wpi::json& update) { + DEBUG4("ClientSetProperties({}, {}, {})", m_id, name, update.dump()); + ServerTopic* topic = m_storage.GetTopic(name); + if (!topic || !topic->IsPublished()) { + WARN( + "server ignoring SetProperties({}) from client {} on unpublished topic " + "'{}'; publish or set a value first", + update.dump(), m_id, name); + return; // nothing to do + } + if (topic->special) { + WARN("server ignoring SetProperties({}) from client {} on meta topic '{}'", + update.dump(), m_id, name); + return; // nothing to do + } + m_storage.SetProperties(nullptr, topic, update); +} + +void ServerClient4Base::ClientSubscribe(int subuid, + std::span topicNames, + const PubSubOptionsImpl& options) { + DEBUG4("ClientSubscribe({}, ({}), {})", m_id, fmt::join(topicNames, ","), + subuid); + auto& sub = m_subscribers[subuid]; + bool replace = false; + if (sub) { + // replace subscription + sub->Update(topicNames, options); + replace = true; + } else { + // create + sub = std::make_unique(GetName(), topicNames, subuid, + options); + } + + // update periodic sender (if not local) + if (!m_local) { + m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->GetPeriodMs()); + m_setPeriodic(m_periodMs); + } + + // see if this immediately subscribes to any topics + // for transmit efficiency, we want to batch announcements and values, so + // send announcements in first loop and remember what we want to send in + // second loop. + std::vector dataToSend; + dataToSend.reserve(m_storage.GetNumTopics()); + m_storage.ForEachTopic([&](ServerTopic* topic) { + auto tcdIt = topic->clients.find(this); + bool removed = tcdIt != topic->clients.end() && replace && + tcdIt->second.subscribers.erase(sub.get()); + + // is client already subscribed? + bool wasSubscribed = + tcdIt != topic->clients.end() && !tcdIt->second.subscribers.empty(); + bool wasSubscribedValue = + wasSubscribed ? tcdIt->second.sendMode != net::ValueSendMode::kDisabled + : false; + + bool added = false; + if (sub->Matches(topic->name, topic->special)) { + if (tcdIt == topic->clients.end()) { + tcdIt = topic->clients.try_emplace(this).first; + } + tcdIt->second.AddSubscriber(sub.get()); + added = true; + } + + if (added ^ removed) { + UpdatePeriod(tcdIt->second, topic); + m_storage.UpdateMetaTopicSub(topic); + } + + // announce topic to client if not previously announced + if (added && !removed && !wasSubscribed) { + DEBUG4("client {}: announce {}", m_id, topic->name); + SendAnnounce(topic, std::nullopt); + } + + // send last value + if (added && !sub->GetOptions().topicsOnly && !wasSubscribedValue && + topic->lastValue) { + dataToSend.emplace_back(topic); + } + }); + + for (auto topic : dataToSend) { + DEBUG4("send last value for {} to client {}", topic->name, m_id); + SendValue(topic, topic->lastValue, net::ValueSendMode::kAll); + } +} + +void ServerClient4Base::ClientUnsubscribe(int subuid) { + DEBUG3("ClientUnsubscribe({}, {})", m_id, subuid); + auto subIt = m_subscribers.find(subuid); + if (subIt == m_subscribers.end() || !subIt->getSecond()) { + return; // nothing to do + } + auto sub = subIt->getSecond().get(); + + // remove from topics + m_storage.ForEachTopic([&](ServerTopic* topic) { + auto tcdIt = topic->clients.find(this); + if (tcdIt != topic->clients.end()) { + if (tcdIt->second.subscribers.erase(sub)) { + UpdatePeriod(tcdIt->second, topic); + m_storage.UpdateMetaTopicSub(topic); + } + } + }); + + // delete it from client (future value sets will be ignored) + m_subscribers.erase(subIt); + + // loop over all subscribers to update period + if (!m_local) { + m_periodMs = net::CalculatePeriod( + m_subscribers, [](auto& x) { return x.getSecond()->GetPeriodMs(); }); + m_setPeriodic(m_periodMs); + } +} + +void ServerClient4Base::ClientSetValue(int pubuid, const Value& value) { + DEBUG4("ClientSetValue({}, {})", m_id, pubuid); + auto publisherIt = m_publishers.find(pubuid); + if (publisherIt == m_publishers.end()) { + WARN("unrecognized client {} pubuid {}, ignoring set", m_id, pubuid); + return; // ignore unrecognized pubuids + } + auto topic = publisherIt->getSecond().get()->GetTopic(); + m_storage.SetValue(this, topic, value); +} + +bool ServerClient4Base::DoProcessIncomingMessages( + net::ClientMessageQueue& queue, size_t max) { + DEBUG4("ProcessIncomingMessage()"); + max = (std::min)(m_msgsBuf.size(), max); + std::span msgs = + queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max)); + + // just map as a normal client into client=0 calls + bool updatepub = false; + bool updatesub = false; + for (const auto& elem : msgs) { // NOLINT + // common case is value, so check that first + if (auto msg = std::get_if(&elem.contents)) { + ClientSetValue(msg->pubuid, msg->value); + } else if (auto msg = std::get_if(&elem.contents)) { + ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties, + msg->options); + updatepub = true; + } else if (auto msg = std::get_if(&elem.contents)) { + ClientUnpublish(msg->pubuid); + updatepub = true; + } else if (auto msg = std::get_if(&elem.contents)) { + ClientSetProperties(msg->name, msg->update); + } else if (auto msg = std::get_if(&elem.contents)) { + ClientSubscribe(msg->subuid, msg->topicNames, msg->options); + updatesub = true; + } else if (auto msg = std::get_if(&elem.contents)) { + ClientUnsubscribe(msg->subuid); + updatesub = true; + } + } + if (updatepub) { + UpdateMetaClientPub(); + } + if (updatesub) { + UpdateMetaClientSub(); + } + + return msgs.size() == max; // don't know for sure, but there might be more +} diff --git a/ntcore/src/main/native/cpp/server/ServerClient4Base.h b/ntcore/src/main/native/cpp/server/ServerClient4Base.h new file mode 100644 index 0000000000..814723095b --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClient4Base.h @@ -0,0 +1,48 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +#include + +#include "net/ClientMessageQueue.h" +#include "server/Functions.h" +#include "server/ServerClient.h" + +namespace nt::server { + +class ServerClient4Base : public ServerClient, + protected net::ClientMessageHandler { + public: + ServerClient4Base(std::string_view name, std::string_view connInfo, + bool local, SetPeriodicFunc setPeriodic, + ServerStorage& storage, int id, wpi::Logger& logger) + : ServerClient{name, connInfo, local, setPeriodic, storage, id, logger} {} + + protected: + // ClientMessageHandler interface + void ClientPublish(int pubuid, std::string_view name, + std::string_view typeStr, const wpi::json& properties, + const PubSubOptionsImpl& options) final; + void ClientUnpublish(int pubuid) final; + void ClientSetProperties(std::string_view name, + const wpi::json& update) final; + void ClientSubscribe(int subuid, std::span topicNames, + const PubSubOptionsImpl& options) final; + void ClientUnsubscribe(int subuid) final; + + void ClientSetValue(int pubuid, const Value& value) final; + + bool DoProcessIncomingMessages(net::ClientMessageQueue& queue, size_t max); + + wpi::DenseMap m_announceSent; + + private: + std::array m_msgsBuf; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerClientLocal.cpp b/ntcore/src/main/native/cpp/server/ServerClientLocal.cpp new file mode 100644 index 0000000000..e99ca4f978 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClientLocal.cpp @@ -0,0 +1,64 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerClientLocal.h" + +#include "server/ServerImpl.h" + +using namespace nt::server; + +ServerClientLocal::ServerClientLocal(ServerStorage& storage, int id, + wpi::Logger& logger) + : ServerClient4Base{"", "", true, [](uint32_t) {}, storage, id, logger} { + // create local client meta topics + m_metaPub = storage.CreateMetaTopic("$serverpub"); + m_metaSub = storage.CreateMetaTopic("$serversub"); + + // update meta topics + UpdateMetaClientPub(); + UpdateMetaClientSub(); +} + +void ServerClientLocal::SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) { + if (m_local) { + m_local->ServerSetValue(topic->localTopic, value); + } +} + +void ServerClientLocal::SendAnnounce(ServerTopic* topic, + std::optional pubuid) { + if (m_local) { + auto& sent = m_announceSent[topic]; + if (sent) { + return; + } + sent = true; + + topic->localTopic = m_local->ServerAnnounce(topic->name, 0, topic->typeStr, + topic->properties, pubuid); + } +} + +void ServerClientLocal::SendUnannounce(ServerTopic* topic) { + if (m_local) { + auto& sent = m_announceSent[topic]; + if (!sent) { + return; + } + sent = false; + m_local->ServerUnannounce(topic->name, topic->localTopic); + } +} + +void ServerClientLocal::SendPropertiesUpdate(ServerTopic* topic, + const wpi::json& update, + bool ack) { + if (m_local) { + if (!m_announceSent.lookup(topic)) { + return; + } + m_local->ServerPropertiesUpdate(topic->name, update, ack); + } +} diff --git a/ntcore/src/main/native/cpp/server/ServerClientLocal.h b/ntcore/src/main/native/cpp/server/ServerClientLocal.h new file mode 100644 index 0000000000..16d0efafa2 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerClientLocal.h @@ -0,0 +1,50 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include + +#include "server/ServerClient4Base.h" + +namespace nt::server { + +class ServerClientLocal final : public ServerClient4Base { + public: + ServerClientLocal(ServerStorage& storage, int id, wpi::Logger& logger); + + bool ProcessIncomingText(std::string_view data) final { return false; } + bool ProcessIncomingBinary(std::span data) final { + return false; + } + + bool ProcessIncomingMessages(size_t max) final { + if (!m_queue) { + return false; + } + return DoProcessIncomingMessages(*m_queue, max); + } + + void SendValue(ServerTopic* topic, const Value& value, + net::ValueSendMode mode) final; + void SendAnnounce(ServerTopic* topic, std::optional pubuid) final; + void SendUnannounce(ServerTopic* topic) final; + void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update, + bool ack) final; + void SendOutgoing(uint64_t curTimeMs, bool flush) final {} + void Flush() final {} + + void SetLocal(net::ServerMessageHandler* local, + net::ClientMessageQueue* queue) { + m_local = local; + m_queue = queue; + } + + private: + net::ServerMessageHandler* m_local = nullptr; + net::ClientMessageQueue* m_queue = nullptr; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.cpp b/ntcore/src/main/native/cpp/server/ServerImpl.cpp index 38f9fbeffb..3c92488c97 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/server/ServerImpl.cpp @@ -8,1243 +8,53 @@ #include #include -#include #include #include #include -#include -#include #include -#include -#include -#include -#include -#include -#include #include "Log.h" -#include "Types_internal.h" -#include "net/Message.h" -#include "net/WireEncoder.h" -#include "net3/WireConnection3.h" -#include "net3/WireEncoder3.h" -#include "networktables/NetworkTableValue.h" -#include "ntcore_c.h" +#include "server/MessagePackWriter.h" +#include "server/ServerClient3.h" +#include "server/ServerClient4.h" +#include "server/ServerClientLocal.h" using namespace nt; using namespace nt::server; using namespace mpack; -// maximum amount of time the wire can be not ready to send another -// transmission before we close the connection -static constexpr uint32_t kWireMaxNotReadyUs = 1000000; - -namespace { -struct Writer : public mpack_writer_t { - Writer() { - mpack_writer_init(this, buf, sizeof(buf)); - mpack_writer_set_context(this, &os); - mpack_writer_set_flush( - this, [](mpack_writer_t* w, const char* buffer, size_t count) { - static_cast(w->context)->write(buffer, count); - }); - } - - std::vector bytes; - wpi::raw_uvector_ostream os{bytes}; - char buf[128]; -}; -} // namespace - -static void WriteOptions(mpack_writer_t& w, const PubSubOptionsImpl& options) { - int size = - (options.sendAll ? 1 : 0) + (options.topicsOnly ? 1 : 0) + - (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs ? 1 : 0) + - (options.prefixMatch ? 1 : 0); - mpack_start_map(&w, size); - if (options.sendAll) { - mpack_write_str(&w, "all"); - mpack_write_bool(&w, true); - } - if (options.topicsOnly) { - mpack_write_str(&w, "topicsonly"); - mpack_write_bool(&w, true); - } - if (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs) { - mpack_write_str(&w, "periodic"); - mpack_write_float(&w, options.periodicMs / 1000.0); - } - if (options.prefixMatch) { - mpack_write_str(&w, "prefix"); - mpack_write_bool(&w, true); - } - mpack_finish_map(&w); -} - -void ServerImpl::PublisherData::UpdateMeta() { - { - Writer w; - mpack_start_map(&w, 2); - mpack_write_str(&w, "uid"); - mpack_write_int(&w, pubuid); - mpack_write_str(&w, "topic"); - mpack_write_str(&w, topic->name); - mpack_finish_map(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - metaClient = std::move(w.bytes); - } - } - { - Writer w; - mpack_start_map(&w, 2); - mpack_write_str(&w, "client"); - if (client) { - mpack_write_str(&w, client->GetName()); - } else { - mpack_write_str(&w, ""); - } - mpack_write_str(&w, "pubuid"); - mpack_write_int(&w, pubuid); - mpack_finish_map(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - metaTopic = std::move(w.bytes); - } - } -} - -void ServerImpl::SubscriberData::UpdateMeta() { - { - Writer w; - mpack_start_map(&w, 3); - mpack_write_str(&w, "uid"); - mpack_write_int(&w, subuid); - mpack_write_str(&w, "topics"); - mpack_start_array(&w, topicNames.size()); - for (auto&& name : topicNames) { - mpack_write_str(&w, name); - } - mpack_finish_array(&w); - mpack_write_str(&w, "options"); - WriteOptions(w, options); - mpack_finish_map(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - metaClient = std::move(w.bytes); - } - } - { - Writer w; - mpack_start_map(&w, 3); - mpack_write_str(&w, "client"); - if (client) { - mpack_write_str(&w, client->GetName()); - } else { - mpack_write_str(&w, ""); - } - mpack_write_str(&w, "subuid"); - mpack_write_int(&w, subuid); - mpack_write_str(&w, "options"); - WriteOptions(w, options); - mpack_finish_map(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - metaTopic = std::move(w.bytes); - } - } -} - -void ServerImpl::ClientData::UpdateMetaClientPub() { - if (!m_metaPub) { - return; - } - Writer w; - mpack_start_array(&w, m_publishers.size()); - for (auto&& pub : m_publishers) { - mpack_write_object_bytes( - &w, reinterpret_cast(pub.second->metaClient.data()), - pub.second->metaClient.size()); - } - mpack_finish_array(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - m_server.SetValue(nullptr, m_metaPub, Value::MakeRaw(std::move(w.bytes))); - } -} - -void ServerImpl::ClientData::UpdateMetaClientSub() { - if (!m_metaSub) { - return; - } - Writer w; - mpack_start_array(&w, m_subscribers.size()); - for (auto&& sub : m_subscribers) { - mpack_write_object_bytes( - &w, reinterpret_cast(sub.second->metaClient.data()), - sub.second->metaClient.size()); - } - mpack_finish_array(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - m_server.SetValue(nullptr, m_metaSub, Value::MakeRaw(std::move(w.bytes))); - } -} - -std::span ServerImpl::ClientData::GetSubscribers( - std::string_view name, bool special, - wpi::SmallVectorImpl& buf) { - buf.resize(0); - for (auto&& subPair : m_subscribers) { - SubscriberData* subscriber = subPair.getSecond().get(); - if (subscriber->Matches(name, special)) { - buf.emplace_back(subscriber); - } - } - return {buf.data(), buf.size()}; -} - -void ServerImpl::ClientData4Base::ClientPublish( - int pubuid, std::string_view name, std::string_view typeStr, - const wpi::json& properties, const PubSubOptionsImpl& options) { - DEBUG3("ClientPublish({}, {}, {}, {})", m_id, name, pubuid, typeStr); - auto topic = m_server.CreateTopic(this, name, typeStr, properties); - - // create publisher - auto [publisherIt, isNew] = m_publishers.try_emplace( - pubuid, std::make_unique(this, topic, pubuid)); - if (!isNew) { - WARN("client {} duplicate publish of pubuid {}", m_id, pubuid); - } else { - // add publisher to topic - topic->AddPublisher(this, publisherIt->getSecond().get()); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - } - - // respond with announce with pubuid to client - DEBUG4("client {}: announce {} pubuid {}", m_id, topic->name, pubuid); - SendAnnounce(topic, pubuid); -} - -void ServerImpl::ClientData4Base::ClientUnpublish(int pubuid) { - DEBUG3("ClientUnpublish({}, {})", m_id, pubuid); - auto publisherIt = m_publishers.find(pubuid); - if (publisherIt == m_publishers.end()) { - return; // nothing to do - } - auto publisher = publisherIt->getSecond().get(); - auto topic = publisher->topic; - - // remove publisher from topic - topic->RemovePublisher(this, publisher); - - // remove publisher from client - m_publishers.erase(publisherIt); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - - // delete topic if no longer published - if (!topic->IsPublished()) { - m_server.DeleteTopic(topic); - } -} - -void ServerImpl::ClientData4Base::ClientSetProperties(std::string_view name, - const wpi::json& update) { - DEBUG4("ClientSetProperties({}, {}, {})", m_id, name, update.dump()); - auto topicIt = m_server.m_nameTopics.find(name); - if (topicIt == m_server.m_nameTopics.end() || - !topicIt->second->IsPublished()) { - WARN( - "server ignoring SetProperties({}) from client {} on unpublished topic " - "'{}'; publish or set a value first", - update.dump(), m_id, name); - return; // nothing to do - } - auto topic = topicIt->second; - if (topic->special) { - WARN("server ignoring SetProperties({}) from client {} on meta topic '{}'", - update.dump(), m_id, name); - return; // nothing to do - } - m_server.SetProperties(nullptr, topic, update); -} - -void ServerImpl::ClientData4Base::ClientSubscribe( - int subuid, std::span topicNames, - const PubSubOptionsImpl& options) { - DEBUG4("ClientSubscribe({}, ({}), {})", m_id, fmt::join(topicNames, ","), - subuid); - auto& sub = m_subscribers[subuid]; - bool replace = false; - if (sub) { - // replace subscription - sub->Update(topicNames, options); - replace = true; - } else { - // create - sub = std::make_unique(this, topicNames, subuid, options); - } - - // limit subscriber min period - if (sub->periodMs < kMinPeriodMs) { - sub->periodMs = kMinPeriodMs; - } - - // update periodic sender (if not local) - if (!m_local) { - m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs); - m_setPeriodic(m_periodMs); - } - - // see if this immediately subscribes to any topics - // for transmit efficiency, we want to batch announcements and values, so - // send announcements in first loop and remember what we want to send in - // second loop. - std::vector dataToSend; - dataToSend.reserve(m_server.m_topics.size()); - for (auto&& topic : m_server.m_topics) { - auto tcdIt = topic->clients.find(this); - bool removed = tcdIt != topic->clients.end() && replace && - tcdIt->second.subscribers.erase(sub.get()); - - // is client already subscribed? - bool wasSubscribed = - tcdIt != topic->clients.end() && !tcdIt->second.subscribers.empty(); - bool wasSubscribedValue = - wasSubscribed ? tcdIt->second.sendMode != net::ValueSendMode::kDisabled - : false; - - bool added = false; - if (sub->Matches(topic->name, topic->special)) { - if (tcdIt == topic->clients.end()) { - tcdIt = topic->clients.try_emplace(this).first; - } - tcdIt->second.AddSubscriber(sub.get()); - added = true; - } - - if (added ^ removed) { - UpdatePeriod(tcdIt->second, topic.get()); - m_server.UpdateMetaTopicSub(topic.get()); - } - - // announce topic to client if not previously announced - if (added && !removed && !wasSubscribed) { - DEBUG4("client {}: announce {}", m_id, topic->name); - SendAnnounce(topic.get(), std::nullopt); - } - - // send last value - if (added && !sub->options.topicsOnly && !wasSubscribedValue && - topic->lastValue) { - dataToSend.emplace_back(topic.get()); - } - } - - for (auto topic : dataToSend) { - DEBUG4("send last value for {} to client {}", topic->name, m_id); - SendValue(topic, topic->lastValue, net::ValueSendMode::kAll); - } -} - -void ServerImpl::ClientData4Base::ClientUnsubscribe(int subuid) { - DEBUG3("ClientUnsubscribe({}, {})", m_id, subuid); - auto subIt = m_subscribers.find(subuid); - if (subIt == m_subscribers.end() || !subIt->getSecond()) { - return; // nothing to do - } - auto sub = subIt->getSecond().get(); - - // remove from topics - for (auto&& topic : m_server.m_topics) { - auto tcdIt = topic->clients.find(this); - if (tcdIt != topic->clients.end()) { - if (tcdIt->second.subscribers.erase(sub)) { - UpdatePeriod(tcdIt->second, topic.get()); - m_server.UpdateMetaTopicSub(topic.get()); - } - } - } - - // delete it from client (future value sets will be ignored) - m_subscribers.erase(subIt); - - // loop over all subscribers to update period - if (!m_local) { - m_periodMs = net::CalculatePeriod( - m_subscribers, [](auto& x) { return x.getSecond()->periodMs; }); - m_setPeriodic(m_periodMs); - } -} - -void ServerImpl::ClientData4Base::ClientSetValue(int pubuid, - const Value& value) { - DEBUG4("ClientSetValue({}, {})", m_id, pubuid); - auto publisherIt = m_publishers.find(pubuid); - if (publisherIt == m_publishers.end()) { - WARN("unrecognized client {} pubuid {}, ignoring set", m_id, pubuid); - return; // ignore unrecognized pubuids - } - auto topic = publisherIt->getSecond().get()->topic; - m_server.SetValue(this, topic, value); -} - -void ServerImpl::ClientDataLocal::SendValue(TopicData* topic, - const Value& value, - net::ValueSendMode mode) { - if (m_server.m_local) { - m_server.m_local->ServerSetValue(topic->localTopic, value); - } -} - -void ServerImpl::ClientDataLocal::SendAnnounce(TopicData* topic, - std::optional pubuid) { - if (m_server.m_local) { - auto& sent = m_announceSent[topic]; - if (sent) { - return; - } - sent = true; - - topic->localTopic = m_server.m_local->ServerAnnounce( - topic->name, 0, topic->typeStr, topic->properties, pubuid); - } -} - -void ServerImpl::ClientDataLocal::SendUnannounce(TopicData* topic) { - if (m_server.m_local) { - auto& sent = m_announceSent[topic]; - if (!sent) { - return; - } - sent = false; - m_server.m_local->ServerUnannounce(topic->name, topic->localTopic); - } -} - -void ServerImpl::ClientDataLocal::SendPropertiesUpdate(TopicData* topic, - const wpi::json& update, - bool ack) { - if (m_server.m_local) { - if (!m_announceSent.lookup(topic)) { - return; - } - m_server.m_local->ServerPropertiesUpdate(topic->name, update, ack); - } -} - -bool ServerImpl::ClientData4Base::DoProcessIncomingMessages( - net::ClientMessageQueue& queue, size_t max) { - DEBUG4("ProcessIncomingMessage()"); - max = (std::min)(m_msgsBuf.size(), max); - std::span msgs = - queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max)); - - // just map as a normal client into client=0 calls - bool updatepub = false; - bool updatesub = false; - for (const auto& elem : msgs) { // NOLINT - // common case is value, so check that first - if (auto msg = std::get_if(&elem.contents)) { - ClientSetValue(msg->pubuid, msg->value); - } else if (auto msg = std::get_if(&elem.contents)) { - ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties, - msg->options); - updatepub = true; - } else if (auto msg = std::get_if(&elem.contents)) { - ClientUnpublish(msg->pubuid); - updatepub = true; - } else if (auto msg = std::get_if(&elem.contents)) { - ClientSetProperties(msg->name, msg->update); - } else if (auto msg = std::get_if(&elem.contents)) { - ClientSubscribe(msg->subuid, msg->topicNames, msg->options); - updatesub = true; - } else if (auto msg = std::get_if(&elem.contents)) { - ClientUnsubscribe(msg->subuid); - updatesub = true; - } - } - if (updatepub) { - UpdateMetaClientPub(); - } - if (updatesub) { - UpdateMetaClientSub(); - } - - return msgs.size() == max; // don't know for sure, but there might be more -} - -bool ServerImpl::ClientData4::ProcessIncomingText(std::string_view data) { - constexpr int kMaxImmProcessing = 10; - bool queueWasEmpty = m_incoming.empty(); - // can't directly process, because we don't know how big it is - WireDecodeText(data, m_incoming, m_logger); - if (queueWasEmpty && - DoProcessIncomingMessages(m_incoming, kMaxImmProcessing)) { - m_wire.StopRead(); - return true; - } - return false; -} - -bool ServerImpl::ClientData4::ProcessIncomingBinary( - std::span data) { - constexpr int kMaxImmProcessing = 10; - // if we've already queued, keep queuing - int count = m_incoming.empty() ? 0 : kMaxImmProcessing; - for (;;) { - if (data.empty()) { - break; - } - - // decode message - int pubuid; - Value value; - std::string error; - if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) { - m_wire.Disconnect(fmt::format("binary decode error: {}", error)); - break; - } - - // respond to RTT ping - if (pubuid == -1) { - auto now = wpi::Now(); - DEBUG4("RTT ping from {}, responding with time={}", m_id, now); - m_wire.SendBinary( - [&](auto& os) { net::WireEncodeBinary(os, -1, now, value); }); - continue; - } - - // handle value set - if (++count < kMaxImmProcessing) { - ClientSetValue(pubuid, value); - } else { - m_incoming.ClientSetValue(pubuid, value); - } - } - if (count >= kMaxImmProcessing) { - m_wire.StopRead(); - return true; - } - return false; -} - -void ServerImpl::ClientData4::SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) { - m_outgoing.SendValue(topic->id, value, mode); -} - -void ServerImpl::ClientData4::SendAnnounce(TopicData* topic, - std::optional pubuid) { - auto& sent = m_announceSent[topic]; - if (sent) { - return; - } - sent = true; - - if (m_local) { - int unsent = m_wire.WriteText([&](auto& os) { - net::WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr, - topic->properties, pubuid); - }); - if (unsent < 0) { - return; // error - } - if (unsent == 0 && m_wire.Flush() == 0) { - return; - } - } - m_outgoing.SendMessage( - topic->id, net::AnnounceMsg{topic->name, static_cast(topic->id), - topic->typeStr, pubuid, topic->properties}); - m_server.m_controlReady = true; -} - -void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) { - auto& sent = m_announceSent[topic]; - if (!sent) { - return; - } - sent = false; - - if (m_local) { - int unsent = m_wire.WriteText([&](auto& os) { - net::WireEncodeUnannounce(os, topic->name, topic->id); - }); - if (unsent < 0) { - return; // error - } - if (unsent == 0 && m_wire.Flush() == 0) { - return; - } - } - m_outgoing.SendMessage( - topic->id, net::UnannounceMsg{topic->name, static_cast(topic->id)}); - m_outgoing.EraseId(topic->id); - m_server.m_controlReady = true; -} - -void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic, - const wpi::json& update, - bool ack) { - if (!m_announceSent.lookup(topic)) { - return; - } - - if (m_local) { - int unsent = m_wire.WriteText([&](auto& os) { - net::WireEncodePropertiesUpdate(os, topic->name, update, ack); - }); - if (unsent < 0) { - return; // error - } - if (unsent == 0 && m_wire.Flush() == 0) { - return; - } - } - m_outgoing.SendMessage(topic->id, - net::PropertiesUpdateMsg{topic->name, update, ack}); - m_server.m_controlReady = true; -} - -void ServerImpl::ClientData4::SendOutgoing(uint64_t curTimeMs, bool flush) { - if (m_wire.GetVersion() >= 0x0401) { - if (!m_ping.Send(curTimeMs)) { - return; - } - } - m_outgoing.SendOutgoing(curTimeMs, flush); -} - -void ServerImpl::ClientData4::UpdatePeriod(TopicData::TopicClientData& tcd, - TopicData* topic) { - uint32_t period = net::CalculatePeriod(tcd.subscribers, - [](auto& x) { return x->periodMs; }); - DEBUG4("updating {} period to {} ms", topic->name, period); - m_outgoing.SetPeriod(topic->id, period); -} - -bool ServerImpl::ClientData3::TopicData3::UpdateFlags(TopicData* topic) { - unsigned int newFlags = topic->persistent ? NT_PERSISTENT : 0; - bool updated = flags != newFlags; - flags = newFlags; - return updated; -} - -bool ServerImpl::ClientData3::ProcessIncomingBinary( - std::span data) { - if (!m_decoder.Execute(&data)) { - m_wire.Disconnect(m_decoder.GetError()); - } - return false; -} - -void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) { - if (m_state != kStateRunning) { - if (mode == net::ValueSendMode::kImm) { - mode = net::ValueSendMode::kAll; - } - } else if (m_local) { - mode = net::ValueSendMode::kImm; // always send local immediately - } - TopicData3* topic3 = GetTopic3(topic); - bool added = false; - - switch (mode) { - case net::ValueSendMode::kDisabled: // do nothing - break; - case net::ValueSendMode::kImm: // send immediately - ++topic3->seqNum; - if (topic3->sentAssign) { - net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id, - topic3->seqNum.value(), value); - } else { - net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, - topic->id, topic3->seqNum.value(), value, - topic3->flags); - topic3->sentAssign = true; - } - if (m_local) { - Flush(); - } - break; - case net::ValueSendMode::kNormal: { - // replace, or append if not present - wpi::DenseMap::iterator it; - std::tie(it, added) = - m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size()); - if (!added && it->second < m_outgoing.size()) { - auto& msg = m_outgoing[it->second]; - if (msg.Is(net3::Message3::kEntryUpdate) || - msg.Is(net3::Message3::kEntryAssign)) { - if (msg.id() == topic->id) { // should always be true - msg.SetValue(value); - break; - } - } - } - } - // fallthrough - case net::ValueSendMode::kAll: // append to outgoing - if (!added) { - m_outgoingValueMap[topic->id] = m_outgoing.size(); - } - ++topic3->seqNum; - if (topic3->sentAssign) { - m_outgoing.emplace_back(net3::Message3::EntryUpdate( - topic->id, topic3->seqNum.value(), value)); - } else { - m_outgoing.emplace_back(net3::Message3::EntryAssign( - topic->name, topic->id, topic3->seqNum.value(), value, - topic3->flags)); - topic3->sentAssign = true; - } - break; - } -} - -void ServerImpl::ClientData3::SendAnnounce(TopicData* topic, - std::optional pubuid) { - // ignore if we've not yet built the subscriber - if (m_subscribers.empty()) { - return; - } - - // subscribe to all non-special topics - if (!topic->special) { - topic->clients[this].AddSubscriber(m_subscribers[0].get()); - m_server.UpdateMetaTopicSub(topic); - } - - // NT3 requires a value to send the assign message, so the assign message - // will get sent when the first value is sent (by SendValue). -} - -void ServerImpl::ClientData3::SendUnannounce(TopicData* topic) { - auto it = m_topics3.find(topic); - if (it == m_topics3.end()) { - return; // never sent to client - } - bool sentAssign = it->second.sentAssign; - m_topics3.erase(it); - if (!sentAssign) { - return; // never sent to client - } - - // map to NT3 delete message - if (m_local && m_state == kStateRunning) { - net3::WireEncodeEntryDelete(m_wire.Send().stream(), topic->id); - Flush(); - } else { - m_outgoing.emplace_back(net3::Message3::EntryDelete(topic->id)); - } -} - -void ServerImpl::ClientData3::SendPropertiesUpdate(TopicData* topic, - const wpi::json& update, - bool ack) { - if (ack) { - return; // we don't ack in NT3 - } - auto it = m_topics3.find(topic); - if (it == m_topics3.end()) { - return; // never sent to client - } - TopicData3* topic3 = &it->second; - // Don't send flags update unless we've already sent an assign message. - // The assign message will contain the updated flags when we eventually - // send it. - if (topic3->UpdateFlags(topic) && topic3->sentAssign) { - if (m_local && m_state == kStateRunning) { - net3::WireEncodeFlagsUpdate(m_wire.Send().stream(), topic->id, - topic3->flags); - Flush(); - } else { - m_outgoing.emplace_back( - net3::Message3::FlagsUpdate(topic->id, topic3->flags)); - } - } -} - -void ServerImpl::ClientData3::SendOutgoing(uint64_t curTimeMs, bool flush) { - if (m_outgoing.empty() || m_state != kStateRunning) { - return; // nothing to do - } - - // rate limit frequency of transmissions - if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) { - return; - } - - if (!m_wire.Ready()) { - uint64_t lastFlushTime = m_wire.GetLastFlushTime(); - uint64_t now = wpi::Now(); - if (lastFlushTime != 0 && now > (lastFlushTime + kWireMaxNotReadyUs)) { - m_wire.Disconnect("transmit stalled"); - } - return; - } - - auto out = m_wire.Send(); - for (auto&& msg : m_outgoing) { - net3::WireEncode(out.stream(), msg); - } - m_wire.Flush(); - m_outgoing.resize(0); - m_outgoingValueMap.clear(); - m_lastSendMs = curTimeMs; -} - -void ServerImpl::ClientData3::KeepAlive() { - DEBUG4("KeepAlive({})", m_id); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected KeepAlive message"); - return; - } - // ignore -} - -void ServerImpl::ClientData3::ServerHelloDone() { - DEBUG4("ServerHelloDone({})", m_id); - m_decoder.SetError("received unexpected ServerHelloDone message"); -} - -void ServerImpl::ClientData3::ClientHelloDone() { - DEBUG4("ClientHelloDone({})", m_id); - if (m_state != kStateServerHelloComplete) { - m_decoder.SetError("received unexpected ClientHelloDone message"); - return; - } - m_state = kStateRunning; -} - -void ServerImpl::ClientData3::ClearEntries() { - DEBUG4("ClearEntries({})", m_id); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected ClearEntries message"); - return; - } - - for (auto topic3it : m_topics3) { - TopicData* topic = topic3it.first; - - // make sure we send assign the next time - topic3it.second.sentAssign = false; - - // unpublish from this client (if it was previously published) - if (topic3it.second.published) { - topic3it.second.published = false; - auto publisherIt = m_publishers.find(topic3it.second.pubuid); - if (publisherIt != m_publishers.end()) { - // remove publisher from topic - topic->RemovePublisher(this, publisherIt->second.get()); - - // remove publisher from client - m_publishers.erase(publisherIt); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - UpdateMetaClientPub(); - } - } - - // set retained=false - m_server.SetProperties(this, topic, {{"retained", false}}); - } -} - -void ServerImpl::ClientData3::ProtoUnsup(unsigned int proto_rev) { - DEBUG4("ProtoUnsup({})", m_id); - m_decoder.SetError("received unexpected ProtoUnsup message"); -} - -void ServerImpl::ClientData3::ClientHello(std::string_view self_id, - unsigned int proto_rev) { - DEBUG4("ClientHello({}, '{}', {:04x})", m_id, self_id, proto_rev); - if (m_state != kStateInitial) { - m_decoder.SetError("received unexpected ClientHello message"); - return; - } - if (proto_rev != 0x0300) { - net3::WireEncodeProtoUnsup(m_wire.Send().stream(), 0x0300); - Flush(); - m_decoder.SetError( - fmt::format("unsupported protocol version {:04x}", proto_rev)); - return; - } - // create a unique name including client id - m_name = fmt::format("{}-NT3@{}", self_id, m_connInfo); - m_connected(m_name, 0x0300); - m_connected = nullptr; // no longer required - - // create client meta topics - m_metaPub = m_server.CreateMetaTopic(fmt::format("$clientpub${}", m_name)); - m_metaSub = m_server.CreateMetaTopic(fmt::format("$clientsub${}", m_name)); - - // subscribe and send initial assignments - auto& sub = m_subscribers[0]; - std::string prefix; - PubSubOptions options; - options.prefixMatch = true; - sub = std::make_unique( - this, std::span{{prefix}}, 0, options); - m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs); - m_setPeriodic(m_periodMs); - - { - auto out = m_wire.Send(); - net3::WireEncodeServerHello(out.stream(), 0, "server"); - for (auto&& topic : m_server.m_topics) { - if (topic && !topic->special && topic->IsPublished() && - topic->lastValue) { - DEBUG4("client {}: initial announce of '{}' (id {})", m_id, topic->name, - topic->id); - topic->clients[this].AddSubscriber(sub.get()); - m_server.UpdateMetaTopicSub(topic.get()); - - TopicData3* topic3 = GetTopic3(topic.get()); - ++topic3->seqNum; - net3::WireEncodeEntryAssign(out.stream(), topic->name, topic->id, - topic3->seqNum.value(), topic->lastValue, - topic3->flags); - topic3->sentAssign = true; - } - } - net3::WireEncodeServerHelloDone(out.stream()); - } - Flush(); - m_state = kStateServerHelloComplete; - - // update meta topics - UpdateMetaClientPub(); - UpdateMetaClientSub(); -} - -void ServerImpl::ClientData3::ServerHello(unsigned int flags, - std::string_view self_id) { - DEBUG4("ServerHello({}, {}, {})", m_id, flags, self_id); - m_decoder.SetError("received unexpected ServerHello message"); -} - -void ServerImpl::ClientData3::EntryAssign(std::string_view name, - unsigned int id, unsigned int seq_num, - const Value& value, - unsigned int flags) { - DEBUG4("EntryAssign({}, {}, {}, {}, {})", m_id, id, seq_num, - static_cast(value.type()), flags); - if (id != 0xffff) { - DEBUG3("ignored EntryAssign from {} with non-0xffff id {}", m_id, id); - return; - } - - // convert from NT3 info - auto typeStr = TypeToString(value.type()); - wpi::json properties = wpi::json::object(); - properties["retained"] = true; // treat all NT3 published topics as retained - properties["cached"] = true; // treat all NT3 published topics as cached - if ((flags & NT_PERSISTENT) != 0) { - properties["persistent"] = true; - } - - // create topic - auto topic = m_server.CreateTopic(this, name, typeStr, properties); - TopicData3* topic3 = GetTopic3(topic); - if (topic3->published || topic3->sentAssign) { - WARN("ignoring client {} duplicate publish of '{}'", m_id, name); - return; - } - ++topic3->seqNum; - topic3->published = true; - topic3->pubuid = m_nextPubUid++; - topic3->sentAssign = true; - - // create publisher - auto [publisherIt, isNew] = m_publishers.try_emplace( - topic3->pubuid, - std::make_unique(this, topic, topic3->pubuid)); - if (!isNew) { - return; // shouldn't happen, but just in case... - } - - // add publisher to topic - topic->AddPublisher(this, publisherIt->getSecond().get()); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - UpdateMetaClientPub(); - - // acts as an announce + data update - SendAnnounce(topic, topic3->pubuid); - m_server.SetValue(this, topic, value); - - // respond with assign message with assigned topic ID - if (m_local && m_state == kStateRunning) { - net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, topic->id, - topic3->seqNum.value(), value, topic3->flags); - } else { - m_outgoing.emplace_back(net3::Message3::EntryAssign( - topic->name, topic->id, topic3->seqNum.value(), value, topic3->flags)); - } -} - -void ServerImpl::ClientData3::EntryUpdate(unsigned int id, unsigned int seq_num, - const Value& value) { - DEBUG4("EntryUpdate({}, {}, {}, {})", m_id, id, seq_num, - static_cast(value.type())); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected EntryUpdate message"); - return; - } - - if (id >= m_server.m_topics.size()) { - DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id); - return; - } - TopicData* topic = m_server.m_topics[id].get(); - if (!topic || !topic->IsPublished()) { - DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id); - return; - } - - TopicData3* topic3 = GetTopic3(topic); - if (!topic3->published) { - topic3->published = true; - topic3->pubuid = m_nextPubUid++; - - // create publisher - auto [publisherIt, isNew] = m_publishers.try_emplace( - topic3->pubuid, - std::make_unique(this, topic, topic3->pubuid)); - if (isNew) { - // add publisher to topic - topic->AddPublisher(this, publisherIt->getSecond().get()); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - UpdateMetaClientPub(); - } - } - topic3->seqNum = net3::SequenceNumber{seq_num}; - - m_server.SetValue(this, topic, value); -} - -void ServerImpl::ClientData3::FlagsUpdate(unsigned int id, unsigned int flags) { - DEBUG4("FlagsUpdate({}, {}, {})", m_id, id, flags); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected FlagsUpdate message"); - return; - } - if (id >= m_server.m_topics.size()) { - DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id); - return; - } - TopicData* topic = m_server.m_topics[id].get(); - if (!topic || !topic->IsPublished()) { - DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id); - return; - } - if (topic->special) { - DEBUG3("ignored FlagsUpdate from {} on special topic {}", m_id, id); - return; - } - m_server.SetFlags(this, topic, flags); -} - -void ServerImpl::ClientData3::EntryDelete(unsigned int id) { - DEBUG4("EntryDelete({}, {})", m_id, id); - if (m_state != kStateRunning) { - m_decoder.SetError("received unexpected EntryDelete message"); - return; - } - if (id >= m_server.m_topics.size()) { - DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id); - return; - } - TopicData* topic = m_server.m_topics[id].get(); - if (!topic || !topic->IsPublished()) { - DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id); - return; - } - if (topic->special) { - DEBUG3("ignored EntryDelete from {} on special topic {}", m_id, id); - return; - } - - auto topic3it = m_topics3.find(topic); - if (topic3it != m_topics3.end()) { - // make sure we send assign the next time - topic3it->second.sentAssign = false; - - // unpublish from this client (if it was previously published) - if (topic3it->second.published) { - topic3it->second.published = false; - auto publisherIt = m_publishers.find(topic3it->second.pubuid); - if (publisherIt != m_publishers.end()) { - // remove publisher from topic - topic->RemovePublisher(this, publisherIt->second.get()); - - // remove publisher from client - m_publishers.erase(publisherIt); - - // update meta data - m_server.UpdateMetaTopicPub(topic); - UpdateMetaClientPub(); - } - } - } - - // set retained=false - m_server.SetProperties(this, topic, {{"retained", false}}); -} - -bool ServerImpl::TopicData::SetProperties(const wpi::json& update) { - if (!update.is_object()) { - return false; - } - bool updated = false; - for (auto&& elem : update.items()) { - if (elem.value().is_null()) { - properties.erase(elem.key()); - } else { - properties[elem.key()] = elem.value(); - } - updated = true; - } - if (updated) { - RefreshProperties(); - } - return updated; -} - -void ServerImpl::TopicData::RefreshProperties() { - persistent = false; - retained = false; - cached = true; - - auto persistentIt = properties.find("persistent"); - if (persistentIt != properties.end()) { - if (auto val = persistentIt->get_ptr()) { - persistent = *val; - } - } - - auto retainedIt = properties.find("retained"); - if (retainedIt != properties.end()) { - if (auto val = retainedIt->get_ptr()) { - retained = *val; - } - } - - auto cachedIt = properties.find("cached"); - if (cachedIt != properties.end()) { - if (auto val = cachedIt->get_ptr()) { - cached = *val; - } - } - - if (!cached) { - lastValue = {}; - lastValueClient = nullptr; - } - - if (!cached && persistent) { - WARN("topic {}: disabling cached property disables persistent storage", - name); - } -} - -bool ServerImpl::TopicData::SetFlags(unsigned int flags_) { - bool updated; - if ((flags_ & NT_PERSISTENT) != 0) { - updated = !persistent; - persistent = true; - properties["persistent"] = true; - } else { - updated = persistent; - persistent = false; - properties.erase("persistent"); - } - if ((flags_ & NT_RETAINED) != 0) { - updated |= !retained; - retained = true; - properties["retained"] = true; - } else { - updated |= retained; - retained = false; - properties.erase("retained"); - } - if ((flags_ & NT_UNCACHED) != 0) { - updated |= cached; - cached = false; - properties["cached"] = false; - lastValue = {}; - lastValueClient = nullptr; - } else { - updated |= !cached; - cached = true; - properties.erase("cached"); - } - if (!cached && persistent) { - WARN("topic {}: disabling cached property disables persistent storage", - name); - } - return updated; -} - -bool ServerImpl::SubscriberData::Matches(std::string_view name, bool special) { - for (auto&& topicName : topicNames) { - if ((!options.prefixMatch && name == topicName) || - (options.prefixMatch && (!special || !topicName.empty()) && - wpi::starts_with(name, topicName))) { - return true; - } - } - return false; -} - -ServerImpl::ServerImpl(wpi::Logger& logger) : m_logger{logger} { +ServerImpl::ServerImpl(wpi::Logger& logger) + : m_logger{logger}, + m_storage{logger, [this](ServerTopic* topic, ServerClient* client) { + SendAnnounce(topic, client); + }} { // local is client 0 - m_clients.emplace_back(std::make_unique(*this, 0, logger)); - m_localClient = static_cast(m_clients.back().get()); + m_clients.emplace_back( + std::make_unique(m_storage, 0, logger)); + m_localClient = static_cast(m_clients.back().get()); + + // create server meta topics + m_metaClients = m_storage.CreateMetaTopic("$clients"); } -std::pair ServerImpl::AddClient( - std::string_view name, std::string_view connInfo, bool local, - net::WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic) { +std::pair ServerImpl::AddClient(std::string_view name, + std::string_view connInfo, + bool local, + net::WireConnection& wire, + SetPeriodicFunc setPeriodic) { if (name.empty()) { name = "NT4"; } - size_t index = m_clients.size(); - // find an empty slot - // just do a linear search as number of clients is typically small (<10) - for (size_t i = 0, end = index; i < end; ++i) { - if (!m_clients[i]) { - index = i; - break; - } - } - if (index == m_clients.size()) { - m_clients.emplace_back(); - } + size_t index = GetEmptyClientSlot(); // ensure name is unique by suffixing index std::string dedupName = fmt::format("{}@{}", name, index); auto& clientData = m_clients[index]; - clientData = std::make_unique(dedupName, connInfo, local, wire, - std::move(setPeriodic), *this, - index, m_logger); - - // create client meta topics - clientData->m_metaPub = - CreateMetaTopic(fmt::format("$clientpub${}", dedupName)); - clientData->m_metaSub = - CreateMetaTopic(fmt::format("$clientsub${}", dedupName)); - - // update meta topics - clientData->UpdateMetaClientPub(); - clientData->UpdateMetaClientSub(); + clientData = std::make_unique(dedupName, connInfo, local, wire, + std::move(setPeriodic), + m_storage, index, m_logger); DEBUG3("AddClient('{}', '{}') -> {}", name, connInfo, index); return {std::move(dedupName), index}; @@ -1252,24 +62,13 @@ std::pair ServerImpl::AddClient( int ServerImpl::AddClient3(std::string_view connInfo, bool local, net3::WireConnection3& wire, - ServerImpl::Connected3Func connected, - ServerImpl::SetPeriodicFunc setPeriodic) { - size_t index = m_clients.size(); - // find an empty slot; we can't check for duplicates until we get a hello. - // just do a linear search as number of clients is typically small (<10) - for (size_t i = 0, end = index; i < end; ++i) { - if (!m_clients[i]) { - index = i; - break; - } - } - if (index == m_clients.size()) { - m_clients.emplace_back(); - } + Connected3Func connected, + SetPeriodicFunc setPeriodic) { + size_t index = GetEmptyClientSlot(); - m_clients[index] = std::make_unique( + m_clients[index] = std::make_unique( connInfo, local, wire, std::move(connected), std::move(setPeriodic), - *this, index, m_logger); + m_storage, index, m_logger); DEBUG3("AddClient3('{}') -> {}", connInfo, index); return index; @@ -1278,572 +77,58 @@ int ServerImpl::AddClient3(std::string_view connInfo, bool local, std::shared_ptr ServerImpl::RemoveClient(int clientId) { DEBUG3("RemoveClient({})", clientId); auto& client = m_clients[clientId]; - - // remove all publishers and subscribers for this client - wpi::SmallVector toDelete; - for (auto&& topic : m_topics) { - bool pubChanged = false; - bool subChanged = false; - auto tcdIt = topic->clients.find(client.get()); - if (tcdIt != topic->clients.end()) { - pubChanged = !tcdIt->second.publishers.empty(); - subChanged = !tcdIt->second.subscribers.empty(); - topic->publisherCount -= tcdIt->second.publishers.size(); - topic->clients.erase(tcdIt); - } - - if (!topic->IsPublished()) { - toDelete.push_back(topic.get()); - } else { - if (pubChanged) { - UpdateMetaTopicPub(topic.get()); - } - if (subChanged) { - UpdateMetaTopicSub(topic.get()); - } - } + if (client) { + m_storage.RemoveClient(client.get()); } - - // delete unpublished topics - for (auto topic : toDelete) { - DeleteTopic(topic); - } - DeleteTopic(client->m_metaPub); - DeleteTopic(client->m_metaSub); - return std::move(client); } -bool ServerImpl::PersistentChanged() { - bool rv = m_persistentChanged; - m_persistentChanged = false; - return rv; -} - -static void DumpValue(wpi::raw_ostream& os, const Value& value, - wpi::json::serializer& s) { - switch (value.type()) { - case NT_BOOLEAN: - if (value.GetBoolean()) { - os << "true"; - } else { - os << "false"; - } - break; - case NT_DOUBLE: - s.dump_float(value.GetDouble()); - break; - case NT_FLOAT: - s.dump_float(value.GetFloat()); - break; - case NT_INTEGER: - s.dump_integer(value.GetInteger()); - break; - case NT_STRING: - os << '"'; - s.dump_escaped(value.GetString(), false); - os << '"'; - break; - case NT_RAW: - case NT_RPC: - os << '"'; - wpi::Base64Encode(os, value.GetRaw()); - os << '"'; - break; - case NT_BOOLEAN_ARRAY: { - os << '['; - bool first = true; - for (auto v : value.GetBooleanArray()) { - if (first) { - first = false; - } else { - os << ", "; - } - if (v) { - os << "true"; - } else { - os << "false"; - } - } - os << ']'; - break; +size_t ServerImpl::GetEmptyClientSlot() { + size_t size = m_clients.size(); + // find an empty slot + // just do a linear search as number of clients is typically small (<10) + for (size_t i = 0, end = size; i < end; ++i) { + if (!m_clients[i]) { + return i; } - case NT_DOUBLE_ARRAY: { - os << '['; - bool first = true; - for (auto v : value.GetDoubleArray()) { - if (first) { - first = false; - } else { - os << ", "; - } - s.dump_float(v); - } - os << ']'; - break; - } - case NT_FLOAT_ARRAY: { - os << '['; - bool first = true; - for (auto v : value.GetFloatArray()) { - if (first) { - first = false; - } else { - os << ", "; - } - s.dump_float(v); - } - os << ']'; - break; - } - case NT_INTEGER_ARRAY: { - os << '['; - bool first = true; - for (auto v : value.GetIntegerArray()) { - if (first) { - first = false; - } else { - os << ", "; - } - s.dump_integer(v); - } - os << ']'; - break; - } - case NT_STRING_ARRAY: { - os << '['; - bool first = true; - for (auto&& v : value.GetStringArray()) { - if (first) { - first = false; - } else { - os << ", "; - } - os << '"'; - s.dump_escaped(v, false); - os << '"'; - } - os << ']'; - break; - } - default: - os << "null"; - break; } + m_clients.emplace_back(); + return size; } -void ServerImpl::DumpPersistent(wpi::raw_ostream& os) { - wpi::json::serializer s{os, ' ', 16}; - os << "[\n"; - bool first = true; - for (const auto& topic : m_topics) { - if (!topic->persistent || !topic->lastValue) { +void ServerImpl::SendAnnounce(ServerTopic* topic, ServerClient* client) { + for (auto&& aClient : m_clients) { + if (!aClient) { continue; } - if (first) { - first = false; - } else { - os << ",\n"; - } - os << " {\n \"name\": \""; - s.dump_escaped(topic->name, false); - os << "\",\n \"type\": \""; - s.dump_escaped(topic->typeStr, false); - os << "\",\n \"value\": "; - DumpValue(os, topic->lastValue, s); - os << ",\n \"properties\": "; - s.dump(topic->properties, true, false, 2, 4); - os << "\n }"; - } - os << "\n]\n"; -} -static std::string* ObjGetString(wpi::json::object_t& obj, std::string_view key, - std::string* error) { - auto it = obj.find(key); - if (it == obj.end()) { - *error = fmt::format("no {} key", key); - return nullptr; - } - auto val = it->second.get_ptr(); - if (!val) { - *error = fmt::format("{} must be a string", key); - } - return val; -} - -std::string ServerImpl::LoadPersistent(std::string_view in) { - if (in.empty()) { - return {}; - } - - wpi::json j; - try { - j = wpi::json::parse(in); - } catch (wpi::json::parse_error& err) { - return fmt::format("could not decode JSON: {}", err.what()); - } - - if (!j.is_array()) { - return "expected JSON array at top level"; - } - - bool persistentChanged = m_persistentChanged; - - std::string allerrors; - int i = -1; - auto time = nt::Now(); - for (auto&& jitem : j) { - ++i; - std::string error; - { - auto obj = jitem.get_ptr(); - if (!obj) { - error = "expected item to be an object"; - goto err; - } - - // name - auto name = ObjGetString(*obj, "name", &error); - if (!name) { - goto err; - } - - // type - auto typeStr = ObjGetString(*obj, "type", &error); - if (!typeStr) { - goto err; - } - - // properties - auto propsIt = obj->find("properties"); - if (propsIt == obj->end()) { - error = "no properties key"; - goto err; - } - auto& props = propsIt->second; - if (!props.is_object()) { - error = "properties must be an object"; - goto err; - } - - // check to make sure persistent property is set - auto persistentIt = props.find("persistent"); - if (persistentIt == props.end()) { - error = "no persistent property"; - goto err; - } - if (auto v = persistentIt->get_ptr()) { - if (!*v) { - error = "persistent property is false"; - goto err; - } - } else { - error = "persistent property is not boolean"; - goto err; - } - - // value - auto valueIt = obj->find("value"); - if (valueIt == obj->end()) { - error = "no value key"; - goto err; - } - Value value; - if (*typeStr == "boolean") { - if (auto v = valueIt->second.get_ptr()) { - value = Value::MakeBoolean(*v, time); - } else { - error = "value type mismatch, expected boolean"; - goto err; - } - } else if (*typeStr == "int") { - if (auto v = valueIt->second.get_ptr()) { - value = Value::MakeInteger(*v, time); - } else if (auto v = valueIt->second.get_ptr()) { - value = Value::MakeInteger(*v, time); - } else { - error = "value type mismatch, expected int"; - goto err; - } - } else if (*typeStr == "float") { - if (auto v = valueIt->second.get_ptr()) { - value = Value::MakeFloat(*v, time); - } else { - error = "value type mismatch, expected float"; - goto err; - } - } else if (*typeStr == "double") { - if (auto v = valueIt->second.get_ptr()) { - value = Value::MakeDouble(*v, time); - } else { - error = "value type mismatch, expected double"; - goto err; - } - } else if (*typeStr == "string" || *typeStr == "json") { - if (auto v = valueIt->second.get_ptr()) { - value = Value::MakeString(*v, time); - } else { - error = "value type mismatch, expected string"; - goto err; - } - } else if (*typeStr == "boolean[]") { - auto arr = valueIt->second.get_ptr(); - if (!arr) { - error = "value type mismatch, expected array"; - goto err; - } - std::vector elems; - for (auto&& jelem : valueIt->second) { - if (auto v = jelem.get_ptr()) { - elems.push_back(*v); - } else { - error = "value type mismatch, expected boolean"; - } - } - value = Value::MakeBooleanArray(elems, time); - } else if (*typeStr == "int[]") { - auto arr = valueIt->second.get_ptr(); - if (!arr) { - error = "value type mismatch, expected array"; - goto err; - } - std::vector elems; - for (auto&& jelem : valueIt->second) { - if (auto v = jelem.get_ptr()) { - elems.push_back(*v); - } else if (auto v = jelem.get_ptr()) { - elems.push_back(*v); - } else { - error = "value type mismatch, expected int"; - } - } - value = Value::MakeIntegerArray(elems, time); - } else if (*typeStr == "double[]") { - auto arr = valueIt->second.get_ptr(); - if (!arr) { - error = "value type mismatch, expected array"; - goto err; - } - std::vector elems; - for (auto&& jelem : valueIt->second) { - if (auto v = jelem.get_ptr()) { - elems.push_back(*v); - } else { - error = "value type mismatch, expected double"; - } - } - value = Value::MakeDoubleArray(elems, time); - } else if (*typeStr == "float[]") { - auto arr = valueIt->second.get_ptr(); - if (!arr) { - error = "value type mismatch, expected array"; - goto err; - } - std::vector elems; - for (auto&& jelem : valueIt->second) { - if (auto v = jelem.get_ptr()) { - elems.push_back(*v); - } else { - error = "value type mismatch, expected float"; - } - } - value = Value::MakeFloatArray(elems, time); - } else if (*typeStr == "string[]") { - auto arr = valueIt->second.get_ptr(); - if (!arr) { - error = "value type mismatch, expected array"; - goto err; - } - std::vector elems; - for (auto&& jelem : valueIt->second) { - if (auto v = jelem.get_ptr()) { - elems.emplace_back(*v); - } else { - error = "value type mismatch, expected string"; - } - } - value = Value::MakeStringArray(std::move(elems), time); - } else { - // raw - if (auto v = valueIt->second.get_ptr()) { - std::vector data; - wpi::Base64Decode(*v, &data); - value = Value::MakeRaw(std::move(data), time); - } else { - error = "value type mismatch, expected string"; - goto err; - } - } - - // create persistent topic - auto topic = CreateTopic(nullptr, *name, *typeStr, props); - - // set value - SetValue(nullptr, topic, value); + // look for subscriber matching prefixes + wpi::SmallVector subscribersBuf; + auto subscribers = + aClient->GetSubscribers(topic->name, topic->special, subscribersBuf); + // don't announce to this client if no subscribers + if (subscribers.empty()) { continue; } - err: - allerrors += fmt::format("{}: {}\n", i, error); - } - m_persistentChanged = persistentChanged; // restore flag - - return allerrors; -} - -ServerImpl::TopicData* ServerImpl::CreateTopic(ClientData* client, - std::string_view name, - std::string_view typeStr, - const wpi::json& properties, - bool special) { - auto& topic = m_nameTopics[name]; - if (topic) { - if (typeStr != topic->typeStr) { - if (client) { - WARN("client {} publish '{}' conflicting type '{}' (currently '{}')", - client->GetName(), name, typeStr, topic->typeStr); + auto& tcd = topic->clients[aClient.get()]; + bool added = false; + for (auto subscriber : subscribers) { + if (tcd.AddSubscriber(subscriber)) { + added = true; } } - } else { - // new topic - unsigned int id = m_topics.emplace_back( - std::make_unique(m_logger, name, typeStr, properties)); - topic = m_topics[id].get(); - topic->id = id; - topic->special = special; - - for (auto&& aClient : m_clients) { - if (!aClient) { - continue; - } - - // look for subscriber matching prefixes - wpi::SmallVector subscribersBuf; - auto subscribers = - aClient->GetSubscribers(name, topic->special, subscribersBuf); - - // don't announce to this client if no subscribers - if (subscribers.empty()) { - continue; - } - - auto& tcd = topic->clients[aClient.get()]; - bool added = false; - for (auto subscriber : subscribers) { - if (tcd.AddSubscriber(subscriber)) { - added = true; - } - } - if (added) { - aClient->UpdatePeriod(tcd, topic); - } - - if (aClient.get() == client) { - continue; // don't announce to requesting client again - } - - DEBUG4("client {}: announce {}", aClient->GetId(), topic->name); - aClient->SendAnnounce(topic, std::nullopt); + if (added) { + aClient->UpdatePeriod(tcd, topic); } - // create meta topics; don't create if topic is itself a meta topic - if (!special) { - topic->metaPub = CreateMetaTopic(fmt::format("$pub${}", name)); - topic->metaSub = CreateMetaTopic(fmt::format("$sub${}", name)); - UpdateMetaTopicPub(topic); - UpdateMetaTopicSub(topic); + if (aClient.get() == client) { + continue; // don't announce to requesting client again } - } - return topic; -} - -ServerImpl::TopicData* ServerImpl::CreateMetaTopic(std::string_view name) { - return CreateTopic(nullptr, name, "msgpack", {{"retained", true}}, true); -} - -void ServerImpl::DeleteTopic(TopicData* topic) { - if (!topic) { - return; - } - - // delete meta topics - if (topic->metaPub) { - DeleteTopic(topic->metaPub); - } - if (topic->metaSub) { - DeleteTopic(topic->metaSub); - } - - // unannounce to all subscribers - for (auto&& tcd : topic->clients) { - if (!tcd.second.subscribers.empty()) { - tcd.first->UpdatePeriod(tcd.second, topic); - tcd.first->SendUnannounce(topic); - } - } - - // erase the topic - m_nameTopics.erase(topic->name); - m_topics.erase(topic->id); -} - -void ServerImpl::SetProperties(ClientData* client, TopicData* topic, - const wpi::json& update) { - DEBUG4("SetProperties({}, {}, {})", client ? client->GetId() : -1, - topic->name, update.dump()); - bool wasPersistent = topic->persistent; - if (topic->SetProperties(update)) { - // update persistentChanged flag - if (topic->persistent != wasPersistent) { - m_persistentChanged = true; - } - PropertiesChanged(client, topic, update); - } -} - -void ServerImpl::SetFlags(ClientData* client, TopicData* topic, - unsigned int flags) { - bool wasPersistent = topic->persistent; - if (topic->SetFlags(flags)) { - // update persistentChanged flag - if (topic->persistent != wasPersistent) { - m_persistentChanged = true; - wpi::json update; - if (topic->persistent) { - update = {{"persistent", true}}; - } else { - update = {{"persistent", wpi::json::object()}}; - } - PropertiesChanged(client, topic, update); - } - } -} - -void ServerImpl::SetValue(ClientData* client, TopicData* topic, - const Value& value) { - // update retained value if from same client or timestamp newer - if (topic->cached && (!topic->lastValue || topic->lastValueClient == client || - topic->lastValue.time() == 0 || - value.time() >= topic->lastValue.time())) { - DEBUG4("updating '{}' last value (time was {} is {})", topic->name, - topic->lastValue.time(), value.time()); - topic->lastValue = value; - topic->lastValueClient = client; - - // if persistent, update flag - if (topic->persistent) { - m_persistentChanged = true; - } - } - - for (auto&& tcd : topic->clients) { - if (tcd.first != client && - tcd.second.sendMode != net::ValueSendMode::kDisabled) { - tcd.first->SendValue(topic, value, tcd.second.sendMode); - } + DEBUG4("client {}: announce {}", aClient->GetId(), topic->name); + aClient->SendAnnounce(topic, std::nullopt); } } @@ -1862,71 +147,13 @@ void ServerImpl::UpdateMetaClients(const std::vector& conns) { } mpack_finish_array(&w); if (mpack_writer_destroy(&w) == mpack_ok) { - SetValue(nullptr, m_metaClients, Value::MakeRaw(std::move(w.bytes))); + m_storage.SetValue(nullptr, m_metaClients, + Value::MakeRaw(std::move(w.bytes))); } else { DEBUG4("failed to encode $clients"); } } -void ServerImpl::UpdateMetaTopicPub(TopicData* topic) { - if (!topic->metaPub) { - return; - } - Writer w; - uint32_t count = 0; - for (auto&& tcd : topic->clients) { - count += tcd.second.publishers.size(); - } - mpack_start_array(&w, count); - for (auto&& tcd : topic->clients) { - for (auto&& pub : tcd.second.publishers) { - mpack_write_object_bytes( - &w, reinterpret_cast(pub->metaTopic.data()), - pub->metaTopic.size()); - } - } - mpack_finish_array(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - SetValue(nullptr, topic->metaPub, Value::MakeRaw(std::move(w.bytes))); - } -} - -void ServerImpl::UpdateMetaTopicSub(TopicData* topic) { - if (!topic->metaSub) { - return; - } - Writer w; - uint32_t count = 0; - for (auto&& tcd : topic->clients) { - count += tcd.second.subscribers.size(); - } - mpack_start_array(&w, count); - for (auto&& tcd : topic->clients) { - for (auto&& sub : tcd.second.subscribers) { - mpack_write_object_bytes( - &w, reinterpret_cast(sub->metaTopic.data()), - sub->metaTopic.size()); - } - } - mpack_finish_array(&w); - if (mpack_writer_destroy(&w) == mpack_ok) { - SetValue(nullptr, topic->metaSub, Value::MakeRaw(std::move(w.bytes))); - } -} - -void ServerImpl::PropertiesChanged(ClientData* client, TopicData* topic, - const wpi::json& update) { - // removing some properties can result in the topic being unpublished - if (!topic->IsPublished()) { - DeleteTopic(topic); - } else { - // send updated announcement to all subscribers - for (auto&& tcd : topic->clients) { - tcd.first->SendPropertiesUpdate(topic, update, tcd.first == client); - } - } -} - void ServerImpl::SendAllOutgoing(uint64_t curTimeMs, bool flush) { for (auto&& client : m_clients) { if (client) { @@ -1944,19 +171,7 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) { void ServerImpl::SetLocal(net::ServerMessageHandler* local, net::ClientMessageQueue* queue) { DEBUG4("SetLocal()"); - m_local = local; - m_localClient->SetQueue(queue); - - // create server meta topics - m_metaClients = CreateMetaTopic("$clients"); - - // create local client meta topics - m_localClient->m_metaPub = CreateMetaTopic("$serverpub"); - m_localClient->m_metaSub = CreateMetaTopic("$serversub"); - - // update meta topics - m_localClient->UpdateMetaClientPub(); - m_localClient->UpdateMetaClientSub(); + m_localClient->SetLocal(local, queue); } bool ServerImpl::ProcessIncomingText(int clientId, std::string_view data) { @@ -1992,14 +207,10 @@ bool ServerImpl::ProcessLocalMessages(size_t max) { return m_localClient->ProcessIncomingMessages(max); } -void ServerImpl::ConnectionsChanged(const std::vector& conns) { - UpdateMetaClients(conns); -} - std::string ServerImpl::DumpPersistent() { std::string rv; wpi::raw_string_ostream os{rv}; - DumpPersistent(os); + m_storage.DumpPersistent(os); os.flush(); return rv; } diff --git a/ntcore/src/main/native/cpp/server/ServerImpl.h b/ntcore/src/main/native/cpp/server/ServerImpl.h index 55c3982617..a7ee99f1d5 100644 --- a/ntcore/src/main/native/cpp/server/ServerImpl.h +++ b/ntcore/src/main/native/cpp/server/ServerImpl.h @@ -6,8 +6,6 @@ #include -#include -#include #include #include #include @@ -15,35 +13,18 @@ #include #include -#include -#include -#include -#include -#include - -#include "PubSubOptions.h" -#include "net/ClientMessageQueue.h" -#include "net/Message.h" -#include "net/NetworkOutgoingQueue.h" -#include "net/NetworkPing.h" -#include "net/WireConnection.h" -#include "net/WireDecoder.h" -#include "net/WireEncoder.h" -#include "net3/Message3.h" -#include "net3/SequenceNumber.h" -#include "net3/WireConnection3.h" -#include "net3/WireDecoder3.h" +#include "server/Functions.h" +#include "server/ServerClient.h" +#include "server/ServerStorage.h" namespace wpi { class Logger; -template -class SmallVectorImpl; -class raw_ostream; } // namespace wpi namespace nt::net { -struct ClientMessage; +class ClientMessageQueue; class LocalInterface; +class ServerMessageHandler; class WireConnection; } // namespace nt::net @@ -53,12 +34,11 @@ class WireConnection3; namespace nt::server { +class ServerClientLocal; +struct ServerTopic; + class ServerImpl final { public: - using SetPeriodicFunc = std::function; - using Connected3Func = - std::function; - explicit ServerImpl(wpi::Logger& logger); void SendAllOutgoing(uint64_t curTimeMs, bool flush); @@ -86,433 +66,34 @@ class ServerImpl final { SetPeriodicFunc setPeriodic); std::shared_ptr RemoveClient(int clientId); - void ConnectionsChanged(const std::vector& conns); + void ConnectionsChanged(const std::vector& conns) { + UpdateMetaClients(conns); + } // if any persistent values changed since the last call to this function - bool PersistentChanged(); + bool PersistentChanged() { return m_storage.PersistentChanged(); } + std::string DumpPersistent(); // returns newline-separated errors - std::string LoadPersistent(std::string_view in); + std::string LoadPersistent(std::string_view in) { + return m_storage.LoadPersistent(in); + } private: - static constexpr uint32_t kMinPeriodMs = 5; - - class ClientData; - struct PublisherData; - struct SubscriberData; - - struct TopicData { - TopicData(wpi::Logger& logger, std::string_view name, - std::string_view typeStr) - : m_logger{logger}, name{name}, typeStr{typeStr} {} - TopicData(wpi::Logger& logger, std::string_view name, - std::string_view typeStr, wpi::json properties) - : m_logger{logger}, - name{name}, - typeStr{typeStr}, - properties(std::move(properties)) { - RefreshProperties(); - } - - bool IsPublished() const { - return persistent || retained || publisherCount != 0; - } - - // returns true if properties changed - bool SetProperties(const wpi::json& update); - void RefreshProperties(); - bool SetFlags(unsigned int flags_); - - wpi::Logger& m_logger; // Must be m_logger for WARN macro to work - std::string name; - unsigned int id; - Value lastValue; - ClientData* lastValueClient = nullptr; - std::string typeStr; - wpi::json properties = wpi::json::object(); - unsigned int publisherCount{0}; - bool persistent{false}; - bool retained{false}; - bool cached{true}; - bool special{false}; - int localTopic{0}; - - void AddPublisher(ClientData* client, PublisherData* pub) { - if (clients[client].publishers.insert(pub).second) { - ++publisherCount; - } - } - - void RemovePublisher(ClientData* client, PublisherData* pub) { - if (clients[client].publishers.erase(pub)) { - --publisherCount; - } - } - - struct TopicClientData { - wpi::SmallPtrSet publishers; - wpi::SmallPtrSet subscribers; - net::ValueSendMode sendMode = net::ValueSendMode::kDisabled; - - bool AddSubscriber(SubscriberData* sub) { - bool added = subscribers.insert(sub).second; - if (!sub->options.topicsOnly) { - if (sub->options.sendAll) { - sendMode = net::ValueSendMode::kAll; - } else if (sendMode == net::ValueSendMode::kDisabled) { - sendMode = net::ValueSendMode::kNormal; - } - } - return added; - } - }; - wpi::SmallDenseMap clients; - - // meta topics - TopicData* metaPub = nullptr; - TopicData* metaSub = nullptr; - }; - - class ClientData { - public: - ClientData(std::string_view name, std::string_view connInfo, bool local, - ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server, - int id, wpi::Logger& logger) - : m_name{name}, - m_connInfo{connInfo}, - m_local{local}, - m_setPeriodic{std::move(setPeriodic)}, - m_server{server}, - m_id{id}, - m_logger{logger} {} - virtual ~ClientData() = default; - - // these return true if any messages have been queued for later processing - virtual bool ProcessIncomingText(std::string_view data) = 0; - virtual bool ProcessIncomingBinary(std::span data) = 0; - - virtual void SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) = 0; - virtual void SendAnnounce(TopicData* topic, std::optional pubuid) = 0; - virtual void SendUnannounce(TopicData* topic) = 0; - virtual void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack) = 0; - virtual void SendOutgoing(uint64_t curTimeMs, bool flush) = 0; - virtual void Flush() = 0; - - // later processing -- returns true if more to process - virtual bool ProcessIncomingMessages(size_t max) = 0; - - void UpdateMetaClientPub(); - void UpdateMetaClientSub(); - - std::span GetSubscribers( - std::string_view name, bool special, - wpi::SmallVectorImpl& buf); - - std::string_view GetName() const { return m_name; } - int GetId() const { return m_id; } - - virtual void UpdatePeriod(TopicData::TopicClientData& tcd, - TopicData* topic) {} - - protected: - std::string m_name; - std::string m_connInfo; - bool m_local; // local to machine - ServerImpl::SetPeriodicFunc m_setPeriodic; - // TODO: make this per-topic? - uint32_t m_periodMs{UINT32_MAX}; - ServerImpl& m_server; - int m_id; - - wpi::Logger& m_logger; - - wpi::DenseMap> m_publishers; - wpi::DenseMap> m_subscribers; - - public: - // meta topics - TopicData* m_metaPub = nullptr; - TopicData* m_metaSub = nullptr; - }; - - class ClientData4Base : public ClientData, - protected net::ClientMessageHandler { - public: - ClientData4Base(std::string_view name, std::string_view connInfo, - bool local, ServerImpl::SetPeriodicFunc setPeriodic, - ServerImpl& server, int id, wpi::Logger& logger) - : ClientData{name, connInfo, local, setPeriodic, server, id, logger} {} - - protected: - // ClientMessageHandler interface - void ClientPublish(int pubuid, std::string_view name, - std::string_view typeStr, const wpi::json& properties, - const PubSubOptionsImpl& options) final; - void ClientUnpublish(int pubuid) final; - void ClientSetProperties(std::string_view name, - const wpi::json& update) final; - void ClientSubscribe(int subuid, std::span topicNames, - const PubSubOptionsImpl& options) final; - void ClientUnsubscribe(int subuid) final; - - void ClientSetValue(int pubuid, const Value& value) final; - - bool DoProcessIncomingMessages(net::ClientMessageQueue& queue, size_t max); - - wpi::DenseMap m_announceSent; - - private: - std::array m_msgsBuf; - }; - - class ClientDataLocal final : public ClientData4Base { - public: - ClientDataLocal(ServerImpl& server, int id, wpi::Logger& logger) - : ClientData4Base{"", "", true, [](uint32_t) {}, server, id, logger} {} - - bool ProcessIncomingText(std::string_view data) final { return false; } - bool ProcessIncomingBinary(std::span data) final { - return false; - } - - bool ProcessIncomingMessages(size_t max) final { - if (!m_queue) { - return false; - } - return DoProcessIncomingMessages(*m_queue, max); - } - - void SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) final; - void SendAnnounce(TopicData* topic, std::optional pubuid) final; - void SendUnannounce(TopicData* topic) final; - void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack) final; - void SendOutgoing(uint64_t curTimeMs, bool flush) final {} - void Flush() final {} - - void SetQueue(net::ClientMessageQueue* queue) { m_queue = queue; } - - private: - net::ClientMessageQueue* m_queue = nullptr; - }; - - class ClientData4 final : public ClientData4Base { - public: - ClientData4(std::string_view name, std::string_view connInfo, bool local, - net::WireConnection& wire, - ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server, - int id, wpi::Logger& logger) - : ClientData4Base{name, connInfo, local, setPeriodic, - server, id, logger}, - m_wire{wire}, - m_ping{wire}, - m_incoming{logger}, - m_outgoing{wire, local} {} - - bool ProcessIncomingText(std::string_view data) final; - bool ProcessIncomingBinary(std::span data) final; - - bool ProcessIncomingMessages(size_t max) final { - if (!DoProcessIncomingMessages(m_incoming, max)) { - m_wire.StartRead(); - return false; - } - return true; - } - - void SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) final; - void SendAnnounce(TopicData* topic, std::optional pubuid) final; - void SendUnannounce(TopicData* topic) final; - void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack) final; - void SendOutgoing(uint64_t curTimeMs, bool flush) final; - - void Flush() final {} - - void UpdatePeriod(TopicData::TopicClientData& tcd, TopicData* topic) final; - - public: - net::WireConnection& m_wire; - - private: - net::NetworkPing m_ping; - net::NetworkIncomingClientQueue m_incoming; - net::NetworkOutgoingQueue m_outgoing; - }; - - class ClientData3 final : public ClientData, private net3::MessageHandler3 { - public: - ClientData3(std::string_view connInfo, bool local, - net3::WireConnection3& wire, - ServerImpl::Connected3Func connected, - ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server, - int id, wpi::Logger& logger) - : ClientData{"", connInfo, local, setPeriodic, server, id, logger}, - m_connected{std::move(connected)}, - m_wire{wire}, - m_decoder{*this}, - m_incoming{logger} {} - - bool ProcessIncomingText(std::string_view data) final { return false; } - bool ProcessIncomingBinary(std::span data) final; - - bool ProcessIncomingMessages(size_t max) final { return false; } - - void SendValue(TopicData* topic, const Value& value, - net::ValueSendMode mode) final; - void SendAnnounce(TopicData* topic, std::optional pubuid) final; - void SendUnannounce(TopicData* topic) final; - void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, - bool ack) final; - void SendOutgoing(uint64_t curTimeMs, bool flush) final; - - void Flush() final { m_wire.Flush(); } - - private: - // MessageHandler3 interface - void KeepAlive() final; - void ServerHelloDone() final; - void ClientHelloDone() final; - void ClearEntries() final; - void ProtoUnsup(unsigned int proto_rev) final; - void ClientHello(std::string_view self_id, unsigned int proto_rev) final; - void ServerHello(unsigned int flags, std::string_view self_id) final; - void EntryAssign(std::string_view name, unsigned int id, - unsigned int seq_num, const Value& value, - unsigned int flags) final; - void EntryUpdate(unsigned int id, unsigned int seq_num, - const Value& value) final; - void FlagsUpdate(unsigned int id, unsigned int flags) final; - void EntryDelete(unsigned int id) final; - void ExecuteRpc(unsigned int id, unsigned int uid, - std::span params) final {} - void RpcResponse(unsigned int id, unsigned int uid, - std::span result) final {} - - ServerImpl::Connected3Func m_connected; - net3::WireConnection3& m_wire; - - enum State { kStateInitial, kStateServerHelloComplete, kStateRunning }; - State m_state{kStateInitial}; - net3::WireDecoder3 m_decoder; - - net::NetworkIncomingClientQueue m_incoming; - std::vector m_outgoing; - wpi::DenseMap m_outgoingValueMap; - int64_t m_nextPubUid{1}; - uint64_t m_lastSendMs{0}; - - struct TopicData3 { - explicit TopicData3(TopicData* topic) { UpdateFlags(topic); } - - unsigned int flags{0}; - net3::SequenceNumber seqNum; - bool sentAssign{false}; - bool published{false}; - int64_t pubuid{0}; - - bool UpdateFlags(TopicData* topic); - }; - wpi::DenseMap m_topics3; - TopicData3* GetTopic3(TopicData* topic) { - return &m_topics3.try_emplace(topic, topic).first->second; - } - }; - - struct PublisherData { - PublisherData(ClientData* client, TopicData* topic, int64_t pubuid) - : client{client}, topic{topic}, pubuid{pubuid} { - UpdateMeta(); - } - - void UpdateMeta(); - - ClientData* client; - TopicData* topic; - int64_t pubuid; - std::vector metaClient; - std::vector metaTopic; - }; - - struct SubscriberData { - SubscriberData(ClientData* client, std::span topicNames, - int64_t subuid, const PubSubOptionsImpl& options) - : client{client}, - topicNames{topicNames.begin(), topicNames.end()}, - subuid{subuid}, - options{options}, - periodMs(std::lround(options.periodicMs / 10.0) * 10) { - UpdateMeta(); - if (periodMs < kMinPeriodMs) { - periodMs = kMinPeriodMs; - } - } - - void Update(std::span topicNames_, - const PubSubOptionsImpl& options_) { - topicNames = {topicNames_.begin(), topicNames_.end()}; - options = options_; - UpdateMeta(); - periodMs = std::lround(options_.periodicMs / 10.0) * 10; - if (periodMs < kMinPeriodMs) { - periodMs = kMinPeriodMs; - } - } - - bool Matches(std::string_view name, bool special); - - void UpdateMeta(); - - ClientData* client; - std::vector topicNames; - int64_t subuid; - PubSubOptionsImpl options; - std::vector metaClient; - std::vector metaTopic; - wpi::DenseMap topics; - // in options as double, but copy here as integer; rounded to the nearest - // 10 ms - uint32_t periodMs; - }; - wpi::Logger& m_logger; - net::ServerMessageHandler* m_local{nullptr}; - bool m_controlReady{false}; - ClientDataLocal* m_localClient; - std::vector> m_clients; - wpi::UidVector, 16> m_topics; - wpi::StringMap m_nameTopics; - bool m_persistentChanged{false}; + ServerClientLocal* m_localClient; + std::vector> m_clients; + + ServerStorage m_storage; // global meta topics (other meta topics are linked to from the specific // client or topic) - TopicData* m_metaClients; + ServerTopic* m_metaClients; - void DumpPersistent(wpi::raw_ostream& os); - - // helper functions - TopicData* CreateTopic(ClientData* client, std::string_view name, - std::string_view typeStr, const wpi::json& properties, - bool special = false); - TopicData* CreateMetaTopic(std::string_view name); - void DeleteTopic(TopicData* topic); - void SetProperties(ClientData* client, TopicData* topic, - const wpi::json& update); - void SetFlags(ClientData* client, TopicData* topic, unsigned int flags); - void SetValue(ClientData* client, TopicData* topic, const Value& value); - - // update meta topic values from data structures + size_t GetEmptyClientSlot(); + void SendAnnounce(ServerTopic* topic, ServerClient* client); void UpdateMetaClients(const std::vector& conns); - void UpdateMetaTopicPub(TopicData* topic); - void UpdateMetaTopicSub(TopicData* topic); - - void PropertiesChanged(ClientData* client, TopicData* topic, - const wpi::json& update); }; } // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerPublisher.cpp b/ntcore/src/main/native/cpp/server/ServerPublisher.cpp new file mode 100644 index 0000000000..90d7b2ea50 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerPublisher.cpp @@ -0,0 +1,42 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerPublisher.h" + +#include + +#include + +#include "server/MessagePackWriter.h" +#include "server/ServerTopic.h" + +using namespace nt::server; +using namespace mpack; + +void ServerPublisher::UpdateMeta() { + { + Writer w; + mpack_start_map(&w, 2); + mpack_write_str(&w, "uid"); + mpack_write_int(&w, m_pubuid); + mpack_write_str(&w, "topic"); + mpack_write_str(&w, m_topic->name); + mpack_finish_map(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + m_metaClient = std::move(w.bytes); + } + } + { + Writer w; + mpack_start_map(&w, 2); + mpack_write_str(&w, "client"); + mpack_write_str(&w, m_clientName); + mpack_write_str(&w, "pubuid"); + mpack_write_int(&w, m_pubuid); + mpack_finish_map(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + m_metaTopic = std::move(w.bytes); + } + } +} diff --git a/ntcore/src/main/native/cpp/server/ServerPublisher.h b/ntcore/src/main/native/cpp/server/ServerPublisher.h new file mode 100644 index 0000000000..e8c0677c8c --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerPublisher.h @@ -0,0 +1,43 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include +#include +#include + +namespace nt::server { + +class ServerClient; +struct ServerTopic; + +class ServerPublisher { + public: + ServerPublisher(std::string_view clientName, ServerTopic* topic, + int64_t pubuid) + : m_clientName{clientName}, m_topic{topic}, m_pubuid{pubuid} { + UpdateMeta(); + } + ServerPublisher(const ServerPublisher&) = delete; + ServerPublisher& operator=(const ServerPublisher&) = delete; + + ServerTopic* GetTopic() const { return m_topic; } + std::span GetMetaClientData() const { return m_metaClient; } + std::span GetMetaTopicData() const { return m_metaTopic; } + + private: + void UpdateMeta(); + + std::string m_clientName; + ServerTopic* m_topic; + int64_t m_pubuid; + std::vector m_metaClient; + std::vector m_metaTopic; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerStorage.cpp b/ntcore/src/main/native/cpp/server/ServerStorage.cpp new file mode 100644 index 0000000000..5ea352f688 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerStorage.cpp @@ -0,0 +1,608 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerStorage.h" + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "Log.h" +#include "server/MessagePackWriter.h" +#include "server/ServerClient.h" + +using namespace nt; +using namespace nt::server; +using namespace mpack; + +ServerTopic* ServerStorage::CreateTopic(ServerClient* client, + std::string_view name, + std::string_view typeStr, + const wpi::json& properties, + bool special) { + auto& topic = m_nameTopics[name]; + if (topic) { + if (typeStr != topic->typeStr) { + if (client) { + WARN("client {} publish '{}' conflicting type '{}' (currently '{}')", + client->GetName(), name, typeStr, topic->typeStr); + } + } + } else { + // new topic + unsigned int id = m_topics.emplace_back( + std::make_unique(m_logger, name, typeStr, properties)); + topic = m_topics[id].get(); + topic->id = id; + topic->special = special; + + m_sendAnnounce(topic, client); + + // create meta topics; don't create if topic is itself a meta topic + if (!special) { + topic->metaPub = CreateMetaTopic(fmt::format("$pub${}", name)); + topic->metaSub = CreateMetaTopic(fmt::format("$sub${}", name)); + UpdateMetaTopicPub(topic); + UpdateMetaTopicSub(topic); + } + } + + return topic; +} + +ServerTopic* ServerStorage::CreateMetaTopic(std::string_view name) { + return CreateTopic(nullptr, name, "msgpack", {{"retained", true}}, true); +} + +void ServerStorage::DeleteTopic(ServerTopic* topic) { + if (!topic) { + return; + } + + // delete meta topics + if (topic->metaPub) { + DeleteTopic(topic->metaPub); + } + if (topic->metaSub) { + DeleteTopic(topic->metaSub); + } + + // unannounce to all subscribers + for (auto&& tcd : topic->clients) { + if (!tcd.second.subscribers.empty()) { + tcd.first->UpdatePeriod(tcd.second, topic); + tcd.first->SendUnannounce(topic); + } + } + + // erase the topic + m_nameTopics.erase(topic->name); + m_topics.erase(topic->id); +} + +void ServerStorage::SetProperties(ServerClient* client, ServerTopic* topic, + const wpi::json& update) { + DEBUG4("SetProperties({}, {}, {})", client ? client->GetId() : -1, + topic->name, update.dump()); + bool wasPersistent = topic->persistent; + if (topic->SetProperties(update)) { + // update persistentChanged flag + if (topic->persistent != wasPersistent) { + m_persistentChanged = true; + } + PropertiesChanged(client, topic, update); + } +} + +void ServerStorage::SetFlags(ServerClient* client, ServerTopic* topic, + unsigned int flags) { + bool wasPersistent = topic->persistent; + if (topic->SetFlags(flags)) { + // update persistentChanged flag + if (topic->persistent != wasPersistent) { + m_persistentChanged = true; + wpi::json update; + if (topic->persistent) { + update = {{"persistent", true}}; + } else { + update = {{"persistent", wpi::json::object()}}; + } + PropertiesChanged(client, topic, update); + } + } +} + +void ServerStorage::SetValue(ServerClient* client, ServerTopic* topic, + const Value& value) { + // update retained value if from same client or timestamp newer + if (topic->cached && (!topic->lastValue || topic->lastValueClient == client || + topic->lastValue.time() == 0 || + value.time() >= topic->lastValue.time())) { + DEBUG4("updating '{}' last value (time was {} is {})", topic->name, + topic->lastValue.time(), value.time()); + topic->lastValue = value; + topic->lastValueClient = client; + + // if persistent, update flag + if (topic->persistent) { + m_persistentChanged = true; + } + } + + for (auto&& tcd : topic->clients) { + if (tcd.first != client && + tcd.second.sendMode != net::ValueSendMode::kDisabled) { + tcd.first->SendValue(topic, value, tcd.second.sendMode); + } + } +} + +void ServerStorage::RemoveClient(ServerClient* client) { + // remove all publishers and subscribers for this client + wpi::SmallVector toDelete; + for (auto&& topic : m_topics) { + bool pubChanged = false; + bool subChanged = false; + auto tcdIt = topic->clients.find(client); + if (tcdIt != topic->clients.end()) { + pubChanged = !tcdIt->second.publishers.empty(); + subChanged = !tcdIt->second.subscribers.empty(); + topic->publisherCount -= tcdIt->second.publishers.size(); + topic->clients.erase(tcdIt); + } + + if (!topic->IsPublished()) { + toDelete.push_back(topic.get()); + } else { + if (pubChanged) { + UpdateMetaTopicPub(topic.get()); + } + if (subChanged) { + UpdateMetaTopicSub(topic.get()); + } + } + } + + // delete unpublished topics + for (auto topic : toDelete) { + DeleteTopic(topic); + } + + DeleteTopic(client->m_metaPub); + DeleteTopic(client->m_metaSub); +} + +void ServerStorage::UpdateMetaTopicPub(ServerTopic* topic) { + if (!topic->metaPub) { + return; + } + Writer w; + uint32_t count = 0; + for (auto&& tcd : topic->clients) { + count += tcd.second.publishers.size(); + } + mpack_start_array(&w, count); + for (auto&& tcd : topic->clients) { + for (auto&& pub : tcd.second.publishers) { + mpack_write_object_bytes(&w, pub->GetMetaTopicData()); + } + } + mpack_finish_array(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + SetValue(nullptr, topic->metaPub, Value::MakeRaw(std::move(w.bytes))); + } +} + +void ServerStorage::UpdateMetaTopicSub(ServerTopic* topic) { + if (!topic->metaSub) { + return; + } + Writer w; + uint32_t count = 0; + for (auto&& tcd : topic->clients) { + count += tcd.second.subscribers.size(); + } + mpack_start_array(&w, count); + for (auto&& tcd : topic->clients) { + for (auto&& sub : tcd.second.subscribers) { + mpack_write_object_bytes(&w, sub->GetMetaTopicData()); + } + } + mpack_finish_array(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + SetValue(nullptr, topic->metaSub, Value::MakeRaw(std::move(w.bytes))); + } +} + +void ServerStorage::PropertiesChanged(ServerClient* client, ServerTopic* topic, + const wpi::json& update) { + // removing some properties can result in the topic being unpublished + if (!topic->IsPublished()) { + DeleteTopic(topic); + } else { + // send updated announcement to all subscribers + for (auto&& tcd : topic->clients) { + tcd.first->SendPropertiesUpdate(topic, update, tcd.first == client); + } + } +} + +static void DumpValue(wpi::raw_ostream& os, const Value& value, + wpi::json::serializer& s) { + switch (value.type()) { + case NT_BOOLEAN: + if (value.GetBoolean()) { + os << "true"; + } else { + os << "false"; + } + break; + case NT_DOUBLE: + s.dump_float(value.GetDouble()); + break; + case NT_FLOAT: + s.dump_float(value.GetFloat()); + break; + case NT_INTEGER: + s.dump_integer(value.GetInteger()); + break; + case NT_STRING: + os << '"'; + s.dump_escaped(value.GetString(), false); + os << '"'; + break; + case NT_RAW: + case NT_RPC: + os << '"'; + wpi::Base64Encode(os, value.GetRaw()); + os << '"'; + break; + case NT_BOOLEAN_ARRAY: { + os << '['; + bool first = true; + for (auto v : value.GetBooleanArray()) { + if (first) { + first = false; + } else { + os << ", "; + } + if (v) { + os << "true"; + } else { + os << "false"; + } + } + os << ']'; + break; + } + case NT_DOUBLE_ARRAY: { + os << '['; + bool first = true; + for (auto v : value.GetDoubleArray()) { + if (first) { + first = false; + } else { + os << ", "; + } + s.dump_float(v); + } + os << ']'; + break; + } + case NT_FLOAT_ARRAY: { + os << '['; + bool first = true; + for (auto v : value.GetFloatArray()) { + if (first) { + first = false; + } else { + os << ", "; + } + s.dump_float(v); + } + os << ']'; + break; + } + case NT_INTEGER_ARRAY: { + os << '['; + bool first = true; + for (auto v : value.GetIntegerArray()) { + if (first) { + first = false; + } else { + os << ", "; + } + s.dump_integer(v); + } + os << ']'; + break; + } + case NT_STRING_ARRAY: { + os << '['; + bool first = true; + for (auto&& v : value.GetStringArray()) { + if (first) { + first = false; + } else { + os << ", "; + } + os << '"'; + s.dump_escaped(v, false); + os << '"'; + } + os << ']'; + break; + } + default: + os << "null"; + break; + } +} + +void ServerStorage::DumpPersistent(wpi::raw_ostream& os) { + wpi::json::serializer s{os, ' ', 16}; + os << "[\n"; + bool first = true; + for (const auto& topic : m_topics) { + if (!topic->persistent || !topic->lastValue) { + continue; + } + if (first) { + first = false; + } else { + os << ",\n"; + } + os << " {\n \"name\": \""; + s.dump_escaped(topic->name, false); + os << "\",\n \"type\": \""; + s.dump_escaped(topic->typeStr, false); + os << "\",\n \"value\": "; + DumpValue(os, topic->lastValue, s); + os << ",\n \"properties\": "; + s.dump(topic->properties, true, false, 2, 4); + os << "\n }"; + } + os << "\n]\n"; +} + +static std::string* ObjGetString(wpi::json::object_t& obj, std::string_view key, + std::string* error) { + auto it = obj.find(key); + if (it == obj.end()) { + *error = fmt::format("no {} key", key); + return nullptr; + } + auto val = it->second.get_ptr(); + if (!val) { + *error = fmt::format("{} must be a string", key); + } + return val; +} + +std::string ServerStorage::LoadPersistent(std::string_view in) { + if (in.empty()) { + return {}; + } + + wpi::json j; + try { + j = wpi::json::parse(in); + } catch (wpi::json::parse_error& err) { + return fmt::format("could not decode JSON: {}", err.what()); + } + + if (!j.is_array()) { + return "expected JSON array at top level"; + } + + bool persistentChanged = m_persistentChanged; + + std::string allerrors; + int i = -1; + auto time = nt::Now(); + for (auto&& jitem : j) { + ++i; + std::string error; + { + auto obj = jitem.get_ptr(); + if (!obj) { + error = "expected item to be an object"; + goto err; + } + + // name + auto name = ObjGetString(*obj, "name", &error); + if (!name) { + goto err; + } + + // type + auto typeStr = ObjGetString(*obj, "type", &error); + if (!typeStr) { + goto err; + } + + // properties + auto propsIt = obj->find("properties"); + if (propsIt == obj->end()) { + error = "no properties key"; + goto err; + } + auto& props = propsIt->second; + if (!props.is_object()) { + error = "properties must be an object"; + goto err; + } + + // check to make sure persistent property is set + auto persistentIt = props.find("persistent"); + if (persistentIt == props.end()) { + error = "no persistent property"; + goto err; + } + if (auto v = persistentIt->get_ptr()) { + if (!*v) { + error = "persistent property is false"; + goto err; + } + } else { + error = "persistent property is not boolean"; + goto err; + } + + // value + auto valueIt = obj->find("value"); + if (valueIt == obj->end()) { + error = "no value key"; + goto err; + } + Value value; + if (*typeStr == "boolean") { + if (auto v = valueIt->second.get_ptr()) { + value = Value::MakeBoolean(*v, time); + } else { + error = "value type mismatch, expected boolean"; + goto err; + } + } else if (*typeStr == "int") { + if (auto v = valueIt->second.get_ptr()) { + value = Value::MakeInteger(*v, time); + } else if (auto v = valueIt->second.get_ptr()) { + value = Value::MakeInteger(*v, time); + } else { + error = "value type mismatch, expected int"; + goto err; + } + } else if (*typeStr == "float") { + if (auto v = valueIt->second.get_ptr()) { + value = Value::MakeFloat(*v, time); + } else { + error = "value type mismatch, expected float"; + goto err; + } + } else if (*typeStr == "double") { + if (auto v = valueIt->second.get_ptr()) { + value = Value::MakeDouble(*v, time); + } else { + error = "value type mismatch, expected double"; + goto err; + } + } else if (*typeStr == "string" || *typeStr == "json") { + if (auto v = valueIt->second.get_ptr()) { + value = Value::MakeString(*v, time); + } else { + error = "value type mismatch, expected string"; + goto err; + } + } else if (*typeStr == "boolean[]") { + auto arr = valueIt->second.get_ptr(); + if (!arr) { + error = "value type mismatch, expected array"; + goto err; + } + std::vector elems; + for (auto&& jelem : valueIt->second) { + if (auto v = jelem.get_ptr()) { + elems.push_back(*v); + } else { + error = "value type mismatch, expected boolean"; + } + } + value = Value::MakeBooleanArray(elems, time); + } else if (*typeStr == "int[]") { + auto arr = valueIt->second.get_ptr(); + if (!arr) { + error = "value type mismatch, expected array"; + goto err; + } + std::vector elems; + for (auto&& jelem : valueIt->second) { + if (auto v = jelem.get_ptr()) { + elems.push_back(*v); + } else if (auto v = jelem.get_ptr()) { + elems.push_back(*v); + } else { + error = "value type mismatch, expected int"; + } + } + value = Value::MakeIntegerArray(elems, time); + } else if (*typeStr == "double[]") { + auto arr = valueIt->second.get_ptr(); + if (!arr) { + error = "value type mismatch, expected array"; + goto err; + } + std::vector elems; + for (auto&& jelem : valueIt->second) { + if (auto v = jelem.get_ptr()) { + elems.push_back(*v); + } else { + error = "value type mismatch, expected double"; + } + } + value = Value::MakeDoubleArray(elems, time); + } else if (*typeStr == "float[]") { + auto arr = valueIt->second.get_ptr(); + if (!arr) { + error = "value type mismatch, expected array"; + goto err; + } + std::vector elems; + for (auto&& jelem : valueIt->second) { + if (auto v = jelem.get_ptr()) { + elems.push_back(*v); + } else { + error = "value type mismatch, expected float"; + } + } + value = Value::MakeFloatArray(elems, time); + } else if (*typeStr == "string[]") { + auto arr = valueIt->second.get_ptr(); + if (!arr) { + error = "value type mismatch, expected array"; + goto err; + } + std::vector elems; + for (auto&& jelem : valueIt->second) { + if (auto v = jelem.get_ptr()) { + elems.emplace_back(*v); + } else { + error = "value type mismatch, expected string"; + } + } + value = Value::MakeStringArray(std::move(elems), time); + } else { + // raw + if (auto v = valueIt->second.get_ptr()) { + std::vector data; + wpi::Base64Decode(*v, &data); + value = Value::MakeRaw(std::move(data), time); + } else { + error = "value type mismatch, expected string"; + goto err; + } + } + + // create persistent topic + auto topic = CreateTopic(nullptr, *name, *typeStr, props); + + // set value + SetValue(nullptr, topic, value); + + continue; + } + err: + allerrors += fmt::format("{}: {}\n", i, error); + } + + m_persistentChanged = persistentChanged; // restore flag + + return allerrors; +} diff --git a/ntcore/src/main/native/cpp/server/ServerStorage.h b/ntcore/src/main/native/cpp/server/ServerStorage.h new file mode 100644 index 0000000000..a3b011fb4d --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerStorage.h @@ -0,0 +1,95 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include "server/ServerTopic.h" + +namespace wpi { +class Logger; +class raw_ostream; +} // namespace wpi + +namespace nt::server { + +class ServerClient; + +class ServerStorage final { + public: + ServerStorage(wpi::Logger& logger, + std::function + sendAnnounce) + : m_logger{logger}, m_sendAnnounce{std::move(sendAnnounce)} {} + ServerStorage(const ServerStorage&) = delete; + ServerStorage& operator=(const ServerStorage&) = delete; + + ServerTopic* CreateTopic(ServerClient* client, std::string_view name, + std::string_view typeStr, + const wpi::json& properties, bool special = false); + ServerTopic* CreateMetaTopic(std::string_view name); + void DeleteTopic(ServerTopic* topic); + void SetProperties(ServerClient* client, ServerTopic* topic, + const wpi::json& update); + void SetFlags(ServerClient* client, ServerTopic* topic, unsigned int flags); + void SetValue(ServerClient* client, ServerTopic* topic, const Value& value); + + void RemoveClient(ServerClient* client); + + void PropertiesChanged(ServerClient* client, ServerTopic* topic, + const wpi::json& update); + + ServerTopic* GetTopic(unsigned int id) const { + return id < m_topics.size() ? m_topics[id].get() : nullptr; + } + ServerTopic* GetTopic(std::string_view name) const { + auto it = m_nameTopics.find(name); + if (it == m_nameTopics.end()) { + return nullptr; + } + return it->second; + } + + // Approximate upper bound, not exact quantity + size_t GetNumTopics() const { return m_topics.size(); } + + void ForEachTopic(std::invocable auto&& func) const { + for (auto&& topic : m_topics) { + func(topic.get()); + } + } + + // update meta topic values from data structures + void UpdateMetaTopicPub(ServerTopic* topic); + void UpdateMetaTopicSub(ServerTopic* topic); + + // if any persistent values changed since the last call to this function + bool PersistentChanged() { + bool rv = m_persistentChanged; + m_persistentChanged = false; + return rv; + } + + void DumpPersistent(wpi::raw_ostream& os); + // returns newline-separated errors + std::string LoadPersistent(std::string_view in); + + private: + wpi::Logger& m_logger; + std::function m_sendAnnounce; + + wpi::UidVector, 16> m_topics; + wpi::StringMap m_nameTopics; + bool m_persistentChanged{false}; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerSubscriber.cpp b/ntcore/src/main/native/cpp/server/ServerSubscriber.cpp new file mode 100644 index 0000000000..a5acc4f065 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerSubscriber.cpp @@ -0,0 +1,88 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerSubscriber.h" + +#include + +#include +#include + +#include "PubSubOptions.h" +#include "server/MessagePackWriter.h" + +using namespace nt; +using namespace nt::server; +using namespace mpack; + +static void WriteOptions(mpack_writer_t& w, const PubSubOptionsImpl& options) { + int size = + (options.sendAll ? 1 : 0) + (options.topicsOnly ? 1 : 0) + + (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs ? 1 : 0) + + (options.prefixMatch ? 1 : 0); + mpack_start_map(&w, size); + if (options.sendAll) { + mpack_write_str(&w, "all"); + mpack_write_bool(&w, true); + } + if (options.topicsOnly) { + mpack_write_str(&w, "topicsonly"); + mpack_write_bool(&w, true); + } + if (options.periodicMs != PubSubOptionsImpl::kDefaultPeriodicMs) { + mpack_write_str(&w, "periodic"); + mpack_write_float(&w, options.periodicMs / 1000.0); + } + if (options.prefixMatch) { + mpack_write_str(&w, "prefix"); + mpack_write_bool(&w, true); + } + mpack_finish_map(&w); +} + +bool ServerSubscriber::Matches(std::string_view name, bool special) { + for (auto&& topicName : m_topicNames) { + if ((!m_options.prefixMatch && name == topicName) || + (m_options.prefixMatch && (!special || !topicName.empty()) && + wpi::starts_with(name, topicName))) { + return true; + } + } + return false; +} + +void ServerSubscriber::UpdateMeta() { + { + Writer w; + mpack_start_map(&w, 3); + mpack_write_str(&w, "uid"); + mpack_write_int(&w, m_subuid); + mpack_write_str(&w, "topics"); + mpack_start_array(&w, m_topicNames.size()); + for (auto&& name : m_topicNames) { + mpack_write_str(&w, name); + } + mpack_finish_array(&w); + mpack_write_str(&w, "options"); + WriteOptions(w, m_options); + mpack_finish_map(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + m_metaClient = std::move(w.bytes); + } + } + { + Writer w; + mpack_start_map(&w, 3); + mpack_write_str(&w, "client"); + mpack_write_str(&w, m_clientName); + mpack_write_str(&w, "subuid"); + mpack_write_int(&w, m_subuid); + mpack_write_str(&w, "options"); + WriteOptions(w, m_options); + mpack_finish_map(&w); + if (mpack_writer_destroy(&w) == mpack_ok) { + m_metaTopic = std::move(w.bytes); + } + } +} diff --git a/ntcore/src/main/native/cpp/server/ServerSubscriber.h b/ntcore/src/main/native/cpp/server/ServerSubscriber.h new file mode 100644 index 0000000000..47a79eb3bd --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerSubscriber.h @@ -0,0 +1,74 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include "PubSubOptions.h" +#include "server/Constants.h" + +namespace nt::server { + +class ServerClient; +struct ServerTopic; + +class ServerSubscriber { + public: + ServerSubscriber(std::string_view clientName, + std::span topicNames, int64_t subuid, + const PubSubOptionsImpl& options) + : m_clientName{clientName}, + m_topicNames{topicNames.begin(), topicNames.end()}, + m_subuid{subuid}, + m_options{options}, + m_periodMs(std::lround(options.periodicMs / 10.0) * 10) { + UpdateMeta(); + if (m_periodMs < kMinPeriodMs) { + m_periodMs = kMinPeriodMs; + } + } + ServerSubscriber(const ServerSubscriber&) = delete; + ServerSubscriber& operator=(const ServerSubscriber&) = delete; + + void Update(std::span topicNames_, + const PubSubOptionsImpl& options_) { + m_topicNames = {topicNames_.begin(), topicNames_.end()}; + m_options = options_; + UpdateMeta(); + m_periodMs = std::lround(options_.periodicMs / 10.0) * 10; + if (m_periodMs < kMinPeriodMs) { + m_periodMs = kMinPeriodMs; + } + } + + bool Matches(std::string_view name, bool special); + + const PubSubOptions& GetOptions() const { return m_options; } + uint32_t GetPeriodMs() const { return m_periodMs; } + + std::span GetMetaClientData() const { return m_metaClient; } + std::span GetMetaTopicData() const { return m_metaTopic; } + + private: + void UpdateMeta(); + + std::string m_clientName; + std::vector m_topicNames; + int64_t m_subuid; + PubSubOptionsImpl m_options; + std::vector m_metaClient; + std::vector m_metaTopic; + // in options as double, but copy here as integer; rounded to the nearest + // 10 ms + uint32_t m_periodMs; +}; + +} // namespace nt::server diff --git a/ntcore/src/main/native/cpp/server/ServerTopic.cpp b/ntcore/src/main/native/cpp/server/ServerTopic.cpp new file mode 100644 index 0000000000..826a49e9c2 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerTopic.cpp @@ -0,0 +1,103 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "ServerTopic.h" + +#include "Log.h" + +using namespace nt::server; + +bool ServerTopic::SetProperties(const wpi::json& update) { + if (!update.is_object()) { + return false; + } + bool updated = false; + for (auto&& elem : update.items()) { + if (elem.value().is_null()) { + properties.erase(elem.key()); + } else { + properties[elem.key()] = elem.value(); + } + updated = true; + } + if (updated) { + RefreshProperties(); + } + return updated; +} + +void ServerTopic::RefreshProperties() { + persistent = false; + retained = false; + cached = true; + + auto persistentIt = properties.find("persistent"); + if (persistentIt != properties.end()) { + if (auto val = persistentIt->get_ptr()) { + persistent = *val; + } + } + + auto retainedIt = properties.find("retained"); + if (retainedIt != properties.end()) { + if (auto val = retainedIt->get_ptr()) { + retained = *val; + } + } + + auto cachedIt = properties.find("cached"); + if (cachedIt != properties.end()) { + if (auto val = cachedIt->get_ptr()) { + cached = *val; + } + } + + if (!cached) { + lastValue = {}; + lastValueClient = nullptr; + } + + if (!cached && persistent) { + WARN("topic {}: disabling cached property disables persistent storage", + name); + } +} + +bool ServerTopic::SetFlags(unsigned int flags_) { + bool updated; + if ((flags_ & NT_PERSISTENT) != 0) { + updated = !persistent; + persistent = true; + properties["persistent"] = true; + } else { + updated = persistent; + persistent = false; + properties.erase("persistent"); + } + if ((flags_ & NT_RETAINED) != 0) { + updated |= !retained; + retained = true; + properties["retained"] = true; + } else { + updated |= retained; + retained = false; + properties.erase("retained"); + } + if ((flags_ & NT_UNCACHED) != 0) { + updated |= cached; + cached = false; + properties["cached"] = false; + lastValue = {}; + lastValueClient = nullptr; + } else { + updated |= !cached; + cached = true; + properties.erase("cached"); + } + if (!cached && persistent) { + WARN("topic {}: disabling cached property disables persistent storage", + name); + } + return updated; +} diff --git a/ntcore/src/main/native/cpp/server/ServerTopic.h b/ntcore/src/main/native/cpp/server/ServerTopic.h new file mode 100644 index 0000000000..20cafccf62 --- /dev/null +++ b/ntcore/src/main/native/cpp/server/ServerTopic.h @@ -0,0 +1,104 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include +#include + +#include +#include +#include + +#include "net/NetworkOutgoingQueue.h" +#include "networktables/NetworkTableValue.h" +#include "server/ServerPublisher.h" +#include "server/ServerSubscriber.h" + +namespace wpi { +class Logger; +} // namespace wpi + +namespace nt::server { + +class ServerClient; + +struct TopicClientData { + wpi::SmallPtrSet publishers; + wpi::SmallPtrSet subscribers; + net::ValueSendMode sendMode = net::ValueSendMode::kDisabled; + + bool AddSubscriber(ServerSubscriber* sub) { + bool added = subscribers.insert(sub).second; + auto& options = sub->GetOptions(); + if (!options.topicsOnly) { + if (options.sendAll) { + sendMode = net::ValueSendMode::kAll; + } else if (sendMode == net::ValueSendMode::kDisabled) { + sendMode = net::ValueSendMode::kNormal; + } + } + return added; + } +}; + +struct ServerTopic { + ServerTopic(wpi::Logger& logger, std::string_view name, + std::string_view typeStr) + : m_logger{logger}, name{name}, typeStr{typeStr} {} + ServerTopic(wpi::Logger& logger, std::string_view name, + std::string_view typeStr, wpi::json properties) + : m_logger{logger}, + name{name}, + typeStr{typeStr}, + properties(std::move(properties)) { + RefreshProperties(); + } + ServerTopic(const ServerTopic&) = delete; + ServerTopic& operator=(const ServerTopic&) = delete; + + bool IsPublished() const { + return persistent || retained || publisherCount != 0; + } + + // returns true if properties changed + bool SetProperties(const wpi::json& update); + void RefreshProperties(); + bool SetFlags(unsigned int flags_); + + wpi::Logger& m_logger; // Must be m_logger for WARN macro to work + std::string name; + unsigned int id; + Value lastValue; + ServerClient* lastValueClient = nullptr; + std::string typeStr; + wpi::json properties = wpi::json::object(); + unsigned int publisherCount{0}; + bool persistent{false}; + bool retained{false}; + bool cached{true}; + bool special{false}; + int localTopic{0}; + + void AddPublisher(ServerClient* client, ServerPublisher* pub) { + if (clients[client].publishers.insert(pub).second) { + ++publisherCount; + } + } + + void RemovePublisher(ServerClient* client, ServerPublisher* pub) { + if (clients[client].publishers.erase(pub)) { + --publisherCount; + } + } + + wpi::SmallDenseMap clients; + + // meta topics + ServerTopic* metaPub = nullptr; + ServerTopic* metaSub = nullptr; +}; + +} // namespace nt::server diff --git a/wpiutil/src/main/native/include/wpi/MessagePack.h b/wpiutil/src/main/native/include/wpi/MessagePack.h index e1f54e5257..2250c47527 100644 --- a/wpiutil/src/main/native/include/wpi/MessagePack.h +++ b/wpiutil/src/main/native/include/wpi/MessagePack.h @@ -24,6 +24,12 @@ inline void mpack_write_bytes(mpack_writer_t* writer, data.size()); } +inline void mpack_write_object_bytes(mpack_writer_t* writer, + std::span data) { + mpack_write_object_bytes(writer, reinterpret_cast(data.data()), + data.size()); +} + inline void mpack_reader_init_data(mpack_reader_t* reader, std::span data) { mpack_reader_init_data(reader, reinterpret_cast(data.data()),