[ntcore] Unify NetworkInterface and MessageHandler (#7190)

This commit is contained in:
Peter Johnson
2024-10-11 14:38:02 -07:00
committed by GitHub
parent 8ca99c7cb7
commit 96f0b2482c
21 changed files with 493 additions and 478 deletions

View File

@@ -20,6 +20,7 @@
#include "Log.h"
#include "Types_internal.h"
#include "Value_internal.h"
#include "net/MessageHandler.h"
#include "networktables/NetworkTableValue.h"
using namespace nt;
@@ -403,7 +404,7 @@ void LocalStorage::Impl::PropertiesUpdated(TopicData* topic,
NotifyTopic(topic, eventFlags | NT_EVENT_PROPERTIES);
// check local flag so we don't echo back received properties changes
if (m_network && sendNetwork) {
m_network->SetProperties(topic->name, update);
m_network->ClientSetProperties(topic->name, update);
}
}
@@ -503,8 +504,9 @@ void LocalStorage::Impl::RemoveNetworkPublisher(TopicData* topic) {
// this may result in a duplicate publish warning on the server side,
// but send one anyway in this case just to be sure
if (nextPub->active && m_network) {
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, nextPub->config);
m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(),
topic->name, topic->typeStr, topic->properties,
nextPub->config);
}
}
}
@@ -561,8 +563,8 @@ LocalStorage::PublisherData* LocalStorage::Impl::AddLocalPublisher(
}
if (publisher->active && m_network) {
m_network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, config);
m_network->ClientPublish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, config);
}
return publisher;
}
@@ -580,7 +582,7 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
}
if (publisher->active && m_network) {
m_network->Unpublish(Handle{publisher->handle}.GetIndex());
m_network->ClientUnpublish(Handle{publisher->handle}.GetIndex());
}
if (publisher->active && !topic->localPublishers.empty()) {
@@ -593,9 +595,9 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
topic->typeStr = nextPub->config.typeStr;
RefreshPubSubActive(topic, false);
if (nextPub->active && m_network) {
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties,
nextPub->config);
m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(),
topic->name, topic->typeStr,
topic->properties, nextPub->config);
}
}
}
@@ -619,8 +621,8 @@ LocalStorage::SubscriberData* LocalStorage::Impl::AddLocalSubscriber(
}
if (m_network && !subscriber->config.hidden) {
DEBUG4("-> NetworkSubscribe({})", topic->name);
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(), {{topic->name}},
config);
m_network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
{{topic->name}}, config);
}
// queue current value
@@ -648,7 +650,7 @@ LocalStorage::Impl::RemoveLocalSubscriber(NT_Subscriber subHandle) {
}
}
if (m_network && !subscriber->config.hidden) {
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
m_network->ClientUnsubscribe(Handle{subscriber->handle}.GetIndex());
}
}
return subscriber;
@@ -685,8 +687,8 @@ LocalStorage::MultiSubscriberData* LocalStorage::Impl::AddMultiSubscriber(
}
if (m_network && !subscriber->options.hidden) {
DEBUG4("-> NetworkSubscribe");
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
m_network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
}
return subscriber;
}
@@ -704,7 +706,7 @@ LocalStorage::Impl::RemoveMultiSubscriber(NT_MultiSubscriber subHandle) {
}
}
if (m_network && !subscriber->options.hidden) {
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
m_network->ClientUnsubscribe(Handle{subscriber->handle}.GetIndex());
}
}
return subscriber;
@@ -978,7 +980,7 @@ bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher,
if (publisher->topic->IsCached()) {
publisher->topic->lastValueNetwork = value;
}
m_network->SetValue(Handle{publisher->handle}.GetIndex(), value);
m_network->ClientSetValue(Handle{publisher->handle}.GetIndex(), value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL,
suppressDuplicates, publisher);
@@ -1074,24 +1076,24 @@ LocalStorage::Impl::Impl(int inst, IListenerStorage& listenerStorage,
LocalStorage::~LocalStorage() = default;
NT_Topic LocalStorage::NetworkAnnounce(std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) {
int LocalStorage::ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) {
std::scoped_lock lock{m_mutex};
auto topic = m_impl.GetOrCreateTopic(name);
m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid);
return topic->handle;
return Handle{topic->handle}.GetIndex();
}
void LocalStorage::NetworkUnannounce(std::string_view name) {
void LocalStorage::ServerUnannounce(std::string_view name, int id) {
std::scoped_lock lock{m_mutex};
auto topic = m_impl.GetOrCreateTopic(name);
m_impl.RemoveNetworkPublisher(topic);
}
void LocalStorage::NetworkPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
void LocalStorage::ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
std::scoped_lock lock{m_mutex};
auto it = m_impl.m_nameTopics.find(name);
if (it != m_impl.m_nameTopics.end()) {
@@ -1099,9 +1101,10 @@ void LocalStorage::NetworkPropertiesUpdate(std::string_view name,
}
}
void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
void LocalStorage::ServerSetValue(int topicId, const Value& value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic =
m_impl.m_topics.Get(Handle{m_impl.m_inst, topicId, Handle::kTopic})) {
if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) {
if (topic->IsCached()) {
topic->lastValueNetwork = value;
@@ -1111,12 +1114,12 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
}
}
void LocalStorage::StartNetwork(net::NetworkInterface* network) {
void LocalStorage::StartNetwork(net::ClientMessageHandler* network) {
std::scoped_lock lock{m_mutex};
m_impl.StartNetwork(network);
}
void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) {
void LocalStorage::Impl::StartNetwork(net::ClientMessageHandler* network) {
DEBUG4("StartNetwork()");
m_network = network;
// publish all active publishers to the network and send last values
@@ -1125,26 +1128,27 @@ void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) {
PublisherData* anyPublisher = nullptr;
for (auto&& publisher : topic->localPublishers) {
if (publisher->active) {
network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, publisher->config);
network->ClientPublish(Handle{publisher->handle}.GetIndex(),
topic->name, topic->typeStr, topic->properties,
publisher->config);
anyPublisher = publisher;
}
}
if (anyPublisher && topic->lastValue) {
network->SetValue(Handle{anyPublisher->handle}.GetIndex(),
topic->lastValue);
network->ClientSetValue(Handle{anyPublisher->handle}.GetIndex(),
topic->lastValue);
}
}
for (auto&& subscriber : m_subscribers) {
if (!subscriber->config.hidden) {
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
{{subscriber->topic->name}}, subscriber->config);
network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
{{subscriber->topic->name}}, subscriber->config);
}
}
for (auto&& subscriber : m_multiSubscribers) {
if (!subscriber->options.hidden) {
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
}
}
}

View File

@@ -6,7 +6,6 @@
#include <stdint.h>
#include <functional>
#include <memory>
#include <span>
#include <string>
@@ -26,6 +25,7 @@
#include "Types_internal.h"
#include "ValueCircularBuffer.h"
#include "VectorSet.h"
#include "net/MessageHandler.h"
#include "net/NetworkInterface.h"
#include "ntcore_cpp.h"
@@ -46,15 +46,15 @@ class LocalStorage final : public net::ILocalStorage {
~LocalStorage() final;
// network interface functions
NT_Topic NetworkAnnounce(std::string_view name, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
void NetworkUnannounce(std::string_view name) final;
void NetworkPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void NetworkSetValue(NT_Topic topicHandle, const Value& value) final;
int ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
void ServerUnannounce(std::string_view name, int id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void ServerSetValue(int topicId, const Value& value) final;
void StartNetwork(net::NetworkInterface* network) final;
void StartNetwork(net::ClientMessageHandler* network) final;
void ClearNetwork() final;
// User functions. These are the actual implementations of the corresponding
@@ -555,7 +555,7 @@ class LocalStorage final : public net::ILocalStorage {
int m_inst;
IListenerStorage& m_listenerStorage;
wpi::Logger& m_logger;
net::NetworkInterface* m_network{nullptr};
net::ClientMessageHandler* m_network{nullptr};
// handle mappings
HandleMap<TopicData, 16> m_topics;
@@ -606,7 +606,7 @@ class LocalStorage final : public net::ILocalStorage {
void RemoveNetworkPublisher(TopicData* topic);
void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack);
void StartNetwork(net::NetworkInterface* network);
void StartNetwork(net::ClientMessageHandler* network);
PublisherData* AddLocalPublisher(TopicData* topic,
const wpi::json& properties,

View File

@@ -22,6 +22,7 @@
#include "IConnectionList.h"
#include "Log.h"
#include "net/NetworkInterface.h"
using namespace nt;
namespace uv = wpi::uv;

View File

@@ -94,17 +94,8 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
continue;
}
// otherwise it's a value message, get the local topic handle for it
auto topicIt = m_topicMap.find(id);
if (topicIt == m_topicMap.end()) {
WARN("received unknown id {}", id);
continue;
}
// pass along to local handler
if (m_local) {
m_local->NetworkSetValue(topicIt->second, value);
}
// otherwise it's a value message
ServerSetValue(id, value);
}
}
@@ -229,19 +220,21 @@ void ClientImpl::SetValue(int32_t pubuid, const Value& value) {
publisher.options.sendAll ? ValueSendMode::kAll : ValueSendMode::kNormal);
}
void ClientImpl::ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) {
int ClientImpl::ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) {
DEBUG4("ServerAnnounce({}, {}, {})", name, id, typeStr);
assert(m_local);
m_topicMap[id] = m_local->NetworkAnnounce(name, typeStr, properties, pubuid);
m_topicMap[id] =
m_local->ServerAnnounce(name, 0, typeStr, properties, pubuid);
return id;
}
void ClientImpl::ServerUnannounce(std::string_view name, int id) {
DEBUG4("ServerUnannounce({}, {})", name, id);
assert(m_local);
m_local->NetworkUnannounce(name);
m_local->ServerUnannounce(name, m_topicMap[id]);
m_topicMap.erase(id);
}
@@ -249,7 +242,21 @@ void ClientImpl::ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
DEBUG4("ServerProperties({}, {}, {})", name, update.dump(), ack);
assert(m_local);
m_local->NetworkPropertiesUpdate(name, update, ack);
m_local->ServerPropertiesUpdate(name, update, ack);
}
void ClientImpl::ServerSetValue(int topicId, const Value& value) {
// get the local topic handle for it
auto topicIt = m_topicMap.find(topicId);
if (topicIt == m_topicMap.end()) {
WARN("received unknown id {}", topicId);
return;
}
// pass along to local handler
if (m_local) {
m_local->ServerSetValue(topicIt->second, value);
}
}
void ClientImpl::ProcessIncomingText(std::string_view data) {

View File

@@ -14,7 +14,7 @@
#include <wpi/DenseMap.h>
#include "NetworkInterface.h"
#include "MessageHandler.h"
#include "NetworkOutgoingQueue.h"
#include "NetworkPing.h"
#include "PubSubOptions.h"
@@ -49,7 +49,7 @@ class ClientImpl final : private ServerMessageHandler {
void SendOutgoing(uint64_t curTimeMs, bool flush);
void SetLocal(LocalInterface* local) { m_local = local; }
void SetLocal(ServerMessageHandler* local) { m_local = local; }
void SendInitial();
private:
@@ -63,12 +63,13 @@ class ClientImpl final : private ServerMessageHandler {
void UpdatePeriodic();
// ServerMessageHandler interface
void ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
int ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
void ServerUnannounce(std::string_view name, int id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void ServerSetValue(int topicId, const Value& value) final;
void Publish(int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
@@ -77,7 +78,7 @@ class ClientImpl final : private ServerMessageHandler {
WireConnection& m_wire;
wpi::Logger& m_logger;
LocalInterface* m_local{nullptr};
ServerMessageHandler* m_local{nullptr};
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
m_timeSyncUpdated;
std::function<void(uint32_t repeatMs)> m_setPeriodic;
@@ -86,7 +87,7 @@ class ClientImpl final : private ServerMessageHandler {
std::vector<std::unique_ptr<PublisherData>> m_publishers;
// indexed by server-provided topic id
wpi::DenseMap<int, NT_Topic> m_topicMap;
wpi::DenseMap<int, int> m_topicMap;
// ping
NetworkPing m_ping;

View File

@@ -0,0 +1,52 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <optional>
#include <span>
#include <string>
#include <string_view>
#include <wpi/json_fwd.h>
namespace nt {
class PubSubOptionsImpl;
class Value;
} // namespace nt
namespace nt::net {
class ClientMessageHandler {
public:
virtual ~ClientMessageHandler() = default;
virtual void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) = 0;
virtual void ClientUnpublish(int pubuid) = 0;
virtual void ClientSetProperties(std::string_view name,
const wpi::json& update) = 0;
virtual void ClientSubscribe(int subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void ClientUnsubscribe(int subuid) = 0;
virtual void ClientSetValue(int pubuid, const Value& value) = 0;
};
class ServerMessageHandler {
public:
virtual ~ServerMessageHandler() = default;
virtual int ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) = 0;
virtual void ServerUnannounce(std::string_view name, int id) = 0;
virtual void ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) = 0;
virtual void ServerSetValue(int topicuid, const Value& value) = 0;
};
} // namespace nt::net

View File

@@ -4,56 +4,13 @@
#pragma once
#include <stdint.h>
#include <span>
#include <string>
#include <string_view>
#include <wpi/json_fwd.h>
#include "ntcore_cpp.h"
namespace nt {
class PubSubOptionsImpl;
class Value;
} // namespace nt
#include "MessageHandler.h"
namespace nt::net {
class LocalInterface {
class ILocalStorage : public ServerMessageHandler {
public:
virtual ~LocalInterface() = default;
virtual NT_Topic NetworkAnnounce(std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) = 0;
virtual void NetworkUnannounce(std::string_view name) = 0;
virtual void NetworkPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) = 0;
virtual void NetworkSetValue(NT_Topic topicHandle, const Value& value) = 0;
};
class NetworkInterface {
public:
virtual ~NetworkInterface() = default;
virtual void Publish(int pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) = 0;
virtual void Unpublish(int pubuid) = 0;
virtual void SetProperties(std::string_view name,
const wpi::json& update) = 0;
virtual void Subscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void Unsubscribe(int subuid) = 0;
virtual void SetValue(int pubuid, const Value& value) = 0;
};
class ILocalStorage : public LocalInterface {
public:
virtual void StartNetwork(NetworkInterface* network) = 0;
virtual void StartNetwork(ClientMessageHandler* network) = 0;
virtual void ClearNetwork() = 0;
};

View File

@@ -10,7 +10,7 @@ using namespace nt::net;
static constexpr size_t kMaxSize = 2 * 1024 * 1024;
void NetworkLoopQueue::SetValue(int pubuid, const Value& value) {
void NetworkLoopQueue::ClientSetValue(int pubuid, const Value& value) {
std::scoped_lock lock{m_mutex};
m_size += sizeof(ClientMessage) + value.size();
if (m_size > kMaxSize) {

View File

@@ -11,7 +11,7 @@
#include <wpi/mutex.h>
#include "Message.h"
#include "NetworkInterface.h"
#include "MessageHandler.h"
namespace wpi {
class Logger;
@@ -19,7 +19,7 @@ class Logger;
namespace nt::net {
class NetworkLoopQueue : public NetworkInterface {
class NetworkLoopQueue : public ClientMessageHandler {
public:
static constexpr size_t kInitialQueueSize = 2000;
@@ -30,16 +30,17 @@ class NetworkLoopQueue : public NetworkInterface {
void ReadQueue(std::vector<ClientMessage>* out);
void ClearQueue();
// NetworkInterface - calls to these append to the queue
void Publish(int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void Unpublish(int pubuid) final;
void SetProperties(std::string_view name, const wpi::json& update) final;
void Subscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void Unsubscribe(int subuid) final;
void SetValue(int pubuid, const Value& value) final;
// ClientMessageHandler - calls to these append to the queue
void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void ClientUnpublish(int pubuid) final;
void ClientSetProperties(std::string_view name,
const wpi::json& update) final;
void ClientSubscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void ClientUnsubscribe(int subuid) final;
void ClientSetValue(int pubuid, const Value& value) final;
private:
wpi::mutex m_mutex;

View File

@@ -9,7 +9,6 @@
#include <vector>
#include "NetworkLoopQueue.h"
#include "ntcore_c.h"
namespace nt::net {
@@ -29,36 +28,36 @@ inline void NetworkLoopQueue::ClearQueue() {
m_sizeErrored = false;
}
inline void NetworkLoopQueue::Publish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
inline void NetworkLoopQueue::ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{PublishMsg{
pubuid, std::string{name}, std::string{typeStr}, properties, options}});
}
inline void NetworkLoopQueue::Unpublish(int pubuid) {
inline void NetworkLoopQueue::ClientUnpublish(int pubuid) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{UnpublishMsg{pubuid}});
}
inline void NetworkLoopQueue::SetProperties(std::string_view name,
const wpi::json& update) {
inline void NetworkLoopQueue::ClientSetProperties(std::string_view name,
const wpi::json& update) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(
ClientMessage{SetPropertiesMsg{std::string{name}, update}});
}
inline void NetworkLoopQueue::Subscribe(int subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
inline void NetworkLoopQueue::ClientSubscribe(
int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{
SubscribeMsg{subuid, {topicNames.begin(), topicNames.end()}, options}});
}
inline void NetworkLoopQueue::Unsubscribe(int subuid) {
inline void NetworkLoopQueue::ClientUnsubscribe(int subuid) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{UnsubscribeMsg{subuid}});
}

View File

@@ -199,10 +199,9 @@ std::span<ServerImpl::SubscriberData*> ServerImpl::ClientData::GetSubscribers(
return {buf.data(), buf.size()};
}
void ServerImpl::ClientData4Base::ClientPublish(int pubuid,
std::string_view name,
std::string_view typeStr,
const wpi::json& properties) {
void ServerImpl::ClientData4Base::ClientPublish(
int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options) {
DEBUG3("ClientPublish({}, {}, {}, {})", m_id, name, pubuid, typeStr);
auto topic = m_server.CreateTopic(this, name, typeStr, properties);
@@ -393,7 +392,7 @@ void ServerImpl::ClientDataLocal::SendValue(TopicData* topic,
const Value& value,
ValueSendMode mode) {
if (m_server.m_local) {
m_server.m_local->NetworkSetValue(topic->localHandle, value);
m_server.m_local->ServerSetValue(topic->localTopic, value);
}
}
@@ -406,8 +405,8 @@ void ServerImpl::ClientDataLocal::SendAnnounce(TopicData* topic,
}
sent = true;
topic->localHandle = m_server.m_local->NetworkAnnounce(
topic->name, topic->typeStr, topic->properties, pubuid);
topic->localTopic = m_server.m_local->ServerAnnounce(
topic->name, 0, topic->typeStr, topic->properties, pubuid);
}
}
@@ -418,7 +417,7 @@ void ServerImpl::ClientDataLocal::SendUnannounce(TopicData* topic) {
return;
}
sent = false;
m_server.m_local->NetworkUnannounce(topic->name);
m_server.m_local->ServerUnannounce(topic->name, topic->localTopic);
}
}
@@ -429,7 +428,7 @@ void ServerImpl::ClientDataLocal::SendPropertiesUpdate(TopicData* topic,
if (!m_announceSent.lookup(topic)) {
return;
}
m_server.m_local->NetworkPropertiesUpdate(topic->name, update, ack);
m_server.m_local->ServerPropertiesUpdate(topic->name, update, ack);
}
}
@@ -447,7 +446,8 @@ void ServerImpl::ClientDataLocal::HandleLocal(
if (auto msg = std::get_if<ClientValueMsg>(&elem.contents)) {
ClientSetValue(msg->pubuid, msg->value);
} else if (auto msg = std::get_if<PublishMsg>(&elem.contents)) {
ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties);
ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties,
msg->options);
updatepub = true;
} else if (auto msg = std::get_if<UnpublishMsg>(&elem.contents)) {
ClientUnpublish(msg->pubuid);
@@ -1923,7 +1923,7 @@ void ServerImpl::HandleLocal(std::span<const ClientMessage> msgs) {
m_localClient->HandleLocal(msgs);
}
void ServerImpl::SetLocal(LocalInterface* local) {
void ServerImpl::SetLocal(ServerMessageHandler* local) {
DEBUG4("SetLocal()");
m_local = local;

View File

@@ -64,7 +64,7 @@ class ServerImpl final {
void SendOutgoing(int clientId, uint64_t curTimeMs);
void HandleLocal(std::span<const ClientMessage> msgs);
void SetLocal(LocalInterface* local);
void SetLocal(ServerMessageHandler* local);
void ProcessIncomingText(int clientId, std::string_view data);
void ProcessIncomingBinary(int clientId, std::span<const uint8_t> data);
@@ -129,7 +129,7 @@ class ServerImpl final {
bool retained{false};
bool cached{true};
bool special{false};
NT_Topic localHandle{0};
int localTopic{0};
void AddPublisher(ClientData* client, PublisherData* pub) {
if (clients[client].publishers.insert(pub).second) {
@@ -237,8 +237,8 @@ class ServerImpl final {
protected:
// ClientMessageHandler interface
void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties) final;
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void ClientUnpublish(int pubuid) final;
void ClientSetProperties(std::string_view name,
const wpi::json& update) final;
@@ -246,7 +246,7 @@ class ServerImpl final {
const PubSubOptionsImpl& options) final;
void ClientUnsubscribe(int subuid) final;
void ClientSetValue(int pubuid, const Value& value);
void ClientSetValue(int pubuid, const Value& value) final;
wpi::DenseMap<TopicData*, bool> m_announceSent;
};
@@ -437,7 +437,7 @@ class ServerImpl final {
};
wpi::Logger& m_logger;
LocalInterface* m_local{nullptr};
ServerMessageHandler* m_local{nullptr};
bool m_controlReady{false};
ClientDataLocal* m_localClient;

View File

@@ -17,6 +17,7 @@
#include <wpi/mpack.h>
#include "Message.h"
#include "MessageHandler.h"
using namespace nt;
using namespace nt::net;
@@ -190,7 +191,7 @@ static bool WireDecodeTextImpl(std::string_view in, T& out,
}
// complete
out.ClientPublish(pubuid, *name, *typeStr, *properties);
out.ClientPublish(pubuid, *name, *typeStr, *properties, {});
rv = true;
} else if (*method == UnpublishMsg::kMethodStr) {
// pubuid

View File

@@ -6,51 +6,22 @@
#include <stdint.h>
#include <optional>
#include <span>
#include <string>
#include <string_view>
#include <wpi/json_fwd.h>
namespace wpi {
class Logger;
} // namespace wpi
namespace nt {
class PubSubOptionsImpl;
class Value;
} // namespace nt
namespace nt::net {
class ClientMessageHandler {
public:
virtual ~ClientMessageHandler() = default;
virtual void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties) = 0;
virtual void ClientUnpublish(int pubuid) = 0;
virtual void ClientSetProperties(std::string_view name,
const wpi::json& update) = 0;
virtual void ClientSubscribe(int subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void ClientUnsubscribe(int subuid) = 0;
};
class ServerMessageHandler {
public:
virtual ~ServerMessageHandler() = default;
virtual void ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) = 0;
virtual void ServerUnannounce(std::string_view name, int id) = 0;
virtual void ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) = 0;
};
class ClientMessageHandler;
class ServerMessageHandler;
// return true if client pub/sub metadata needs updating
bool WireDecodeText(std::string_view in, ClientMessageHandler& out,

View File

@@ -360,13 +360,13 @@ void ClientImpl3::EntryAssign(std::string_view name, unsigned int id,
if (m_local) {
// XXX: need to handle type change specially? (e.g. with unannounce)
if (entry->topic == 0 || flagsChanged || typeChanged) {
if (!entry->topic || flagsChanged || typeChanged) {
DEBUG4("NetworkAnnounce({}, {})", name, entry->typeStr);
entry->topic = m_local->NetworkAnnounce(name, entry->typeStr,
entry->properties, std::nullopt);
entry->topic = m_local->ServerAnnounce(name, 0, entry->typeStr,
entry->properties, std::nullopt);
}
if (valueChanged) {
m_local->NetworkSetValue(entry->topic, entry->value);
m_local->ServerSetValue(entry->topic.value(), entry->value);
}
}
}
@@ -380,8 +380,8 @@ void ClientImpl3::EntryUpdate(unsigned int id, unsigned int seq_num,
}
if (auto entry = LookupId(id)) {
entry->value = value;
if (m_local && entry->topic != 0) {
m_local->NetworkSetValue(entry->topic, entry->value);
if (m_local && entry->topic) {
m_local->ServerSetValue(entry->topic.value(), entry->value);
}
}
}
@@ -395,7 +395,7 @@ void ClientImpl3::FlagsUpdate(unsigned int id, unsigned int flags) {
if (auto entry = LookupId(id)) {
wpi::json update = entry->SetFlags(flags);
if (!update.empty() && m_local) {
m_local->NetworkPropertiesUpdate(entry->name, update, false);
m_local->ServerPropertiesUpdate(entry->name, update, false);
}
}
@@ -419,8 +419,8 @@ void ClientImpl3::EntryDelete(unsigned int id) {
entry->value = Value{};
// if we have no local publishers, unannounce
if (entry->publishers.empty() && m_local) {
m_local->NetworkUnannounce(entry->name);
if (entry->publishers.empty() && m_local && entry->topic) {
m_local->ServerUnannounce(entry->name, entry->topic.value());
}
}
@@ -443,8 +443,8 @@ void ClientImpl3::ClearEntries() {
entry->value = Value{};
// if we have no local publishers, unannounce
if (entry->publishers.empty() && m_local) {
m_local->NetworkUnannounce(entry->name);
if (entry->publishers.empty() && m_local && entry->topic) {
m_local->ServerUnannounce(entry->name, entry->topic.value());
}
entry = nullptr; // clear id mapping

View File

@@ -8,6 +8,7 @@
#include <functional>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <string_view>
@@ -18,7 +19,7 @@
#include <wpi/json.h>
#include "PubSubOptions.h"
#include "net/NetworkInterface.h"
#include "net/MessageHandler.h"
#include "net3/Message3.h"
#include "net3/SequenceNumber.h"
#include "net3/WireConnection3.h"
@@ -52,7 +53,7 @@ class ClientImpl3 final : private MessageHandler3 {
DoSendPeriodic(curTimeMs, false, flush);
}
void SetLocal(net::LocalInterface* local) { m_local = local; }
void SetLocal(net::ServerMessageHandler* local) { m_local = local; }
private:
struct Entry;
@@ -93,8 +94,8 @@ class ClientImpl3 final : private MessageHandler3 {
// Sequence number for update resolution
SequenceNumber seqNum;
// Local topic handle
NT_Topic topic{0};
// Local topic id
std::optional<int> topic;
// Local publishers
std::vector<PublisherData*> publishers;
@@ -139,7 +140,7 @@ class ClientImpl3 final : private MessageHandler3 {
WireConnection3& m_wire;
wpi::Logger& m_logger;
net::LocalInterface* m_local{nullptr};
net::ServerMessageHandler* m_local{nullptr};
std::function<void(uint32_t repeatMs)> m_setPeriodic;
uint64_t m_initTimeMs;