[ntcore] Change internal interfaces and messages to use UIDs (#7189)

Also make Handle functions constexpr.
This commit is contained in:
Peter Johnson
2024-10-11 10:57:36 -07:00
committed by GitHub
parent 59bc53b9b8
commit 8ca99c7cb7
26 changed files with 435 additions and 500 deletions

View File

@@ -33,12 +33,12 @@ class Handle {
static_assert(kTypeMax <= wpi::kHandleTypeHALBase);
enum { kIndexMax = 0xfffff };
explicit Handle(NT_Handle handle) : m_handle(handle) {}
operator NT_Handle() const { return m_handle; }
constexpr explicit Handle(NT_Handle handle) : m_handle(handle) {}
constexpr operator NT_Handle() const { return m_handle; }
NT_Handle handle() const { return m_handle; }
constexpr NT_Handle handle() const { return m_handle; }
Handle(int inst, int index, Type type) {
constexpr Handle(int inst, int index, Type type) {
if (inst < 0 || index < 0) {
m_handle = 0;
return;
@@ -47,16 +47,22 @@ class Handle {
(index & 0xfffff);
}
unsigned int GetIndex() const {
constexpr unsigned int GetIndex() const {
return static_cast<unsigned int>(m_handle) & 0xfffff;
}
Type GetType() const {
constexpr Type GetType() const {
return static_cast<Type>((static_cast<int>(m_handle) >> 24) & 0x7f);
}
int GetInst() const { return (static_cast<int>(m_handle) >> 20) & 0xf; }
bool IsType(Type type) const { return type == GetType(); }
int GetTypedIndex(Type type) const { return IsType(type) ? GetIndex() : -1; }
int GetTypedInst(Type type) const { return IsType(type) ? GetInst() : -1; }
constexpr int GetInst() const {
return (static_cast<int>(m_handle) >> 20) & 0xf;
}
constexpr bool IsType(Type type) const { return type == GetType(); }
constexpr int GetTypedIndex(Type type) const {
return IsType(type) ? GetIndex() : -1;
}
constexpr int GetTypedInst(Type type) const {
return IsType(type) ? GetInst() : -1;
}
private:
NT_Handle m_handle;

View File

@@ -403,7 +403,7 @@ void LocalStorage::Impl::PropertiesUpdated(TopicData* topic,
NotifyTopic(topic, eventFlags | NT_EVENT_PROPERTIES);
// check local flag so we don't echo back received properties changes
if (m_network && sendNetwork) {
m_network->SetProperties(topic->handle, topic->name, update);
m_network->SetProperties(topic->name, update);
}
}
@@ -427,10 +427,10 @@ void LocalStorage::Impl::RefreshPubSubActive(TopicData* topic,
void LocalStorage::Impl::NetworkAnnounce(TopicData* topic,
std::string_view typeStr,
const wpi::json& properties,
NT_Publisher pubHandle) {
std::optional<int> pubuid) {
DEBUG4("LS NetworkAnnounce({}, {}, {}, {})", topic->name, typeStr,
properties.dump(), pubHandle);
if (pubHandle != 0) {
properties.dump(), pubuid.value_or(-1));
if (pubuid.has_value()) {
return; // ack of our publish; ignore
}
@@ -503,7 +503,7 @@ void LocalStorage::Impl::RemoveNetworkPublisher(TopicData* topic) {
// this may result in a duplicate publish warning on the server side,
// but send one anyway in this case just to be sure
if (nextPub->active && m_network) {
m_network->Publish(nextPub->handle, topic->handle, topic->name,
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, nextPub->config);
}
}
@@ -561,7 +561,7 @@ LocalStorage::PublisherData* LocalStorage::Impl::AddLocalPublisher(
}
if (publisher->active && m_network) {
m_network->Publish(publisher->handle, topic->handle, topic->name,
m_network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, config);
}
return publisher;
@@ -580,7 +580,7 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
}
if (publisher->active && m_network) {
m_network->Unpublish(publisher->handle, topic->handle);
m_network->Unpublish(Handle{publisher->handle}.GetIndex());
}
if (publisher->active && !topic->localPublishers.empty()) {
@@ -593,7 +593,7 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
topic->typeStr = nextPub->config.typeStr;
RefreshPubSubActive(topic, false);
if (nextPub->active && m_network) {
m_network->Publish(nextPub->handle, topic->handle, topic->name,
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties,
nextPub->config);
}
@@ -619,7 +619,8 @@ LocalStorage::SubscriberData* LocalStorage::Impl::AddLocalSubscriber(
}
if (m_network && !subscriber->config.hidden) {
DEBUG4("-> NetworkSubscribe({})", topic->name);
m_network->Subscribe(subscriber->handle, {{topic->name}}, config);
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(), {{topic->name}},
config);
}
// queue current value
@@ -647,7 +648,7 @@ LocalStorage::Impl::RemoveLocalSubscriber(NT_Subscriber subHandle) {
}
}
if (m_network && !subscriber->config.hidden) {
m_network->Unsubscribe(subscriber->handle);
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
}
}
return subscriber;
@@ -684,8 +685,8 @@ LocalStorage::MultiSubscriberData* LocalStorage::Impl::AddMultiSubscriber(
}
if (m_network && !subscriber->options.hidden) {
DEBUG4("-> NetworkSubscribe");
m_network->Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
}
return subscriber;
}
@@ -703,7 +704,7 @@ LocalStorage::Impl::RemoveMultiSubscriber(NT_MultiSubscriber subHandle) {
}
}
if (m_network && !subscriber->options.hidden) {
m_network->Unsubscribe(subscriber->handle);
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
}
}
return subscriber;
@@ -977,7 +978,7 @@ bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher,
if (publisher->topic->IsCached()) {
publisher->topic->lastValueNetwork = value;
}
m_network->SetValue(publisher->handle, value);
m_network->SetValue(Handle{publisher->handle}.GetIndex(), value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL,
suppressDuplicates, publisher);
@@ -1076,10 +1077,10 @@ LocalStorage::~LocalStorage() = default;
NT_Topic LocalStorage::NetworkAnnounce(std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
NT_Publisher pubHandle) {
std::optional<int> pubuid) {
std::scoped_lock lock{m_mutex};
auto topic = m_impl.GetOrCreateTopic(name);
m_impl.NetworkAnnounce(topic, typeStr, properties, pubHandle);
m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid);
return topic->handle;
}
@@ -1124,25 +1125,26 @@ void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) {
PublisherData* anyPublisher = nullptr;
for (auto&& publisher : topic->localPublishers) {
if (publisher->active) {
network->Publish(publisher->handle, topic->handle, topic->name,
network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, publisher->config);
anyPublisher = publisher;
}
}
if (anyPublisher && topic->lastValue) {
network->SetValue(anyPublisher->handle, topic->lastValue);
network->SetValue(Handle{anyPublisher->handle}.GetIndex(),
topic->lastValue);
}
}
for (auto&& subscriber : m_subscribers) {
if (!subscriber->config.hidden) {
network->Subscribe(subscriber->handle, {{subscriber->topic->name}},
subscriber->config);
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
{{subscriber->topic->name}}, subscriber->config);
}
}
for (auto&& subscriber : m_multiSubscribers) {
if (!subscriber->options.hidden) {
network->Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
}
}
}

View File

@@ -48,7 +48,7 @@ class LocalStorage final : public net::ILocalStorage {
// network interface functions
NT_Topic NetworkAnnounce(std::string_view name, std::string_view typeStr,
const wpi::json& properties,
NT_Publisher pubHandle) final;
std::optional<int> pubuid) final;
void NetworkUnannounce(std::string_view name) final;
void NetworkPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
@@ -601,7 +601,8 @@ class LocalStorage final : public net::ILocalStorage {
void RefreshPubSubActive(TopicData* topic, bool warnOnSubMismatch);
void NetworkAnnounce(TopicData* topic, std::string_view typeStr,
const wpi::json& properties, NT_Publisher pubHandle);
const wpi::json& properties,
std::optional<int> pubuid);
void RemoveNetworkPublisher(TopicData* topic);
void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack);

View File

@@ -407,7 +407,7 @@ void NetworkClient::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp,
m_wire = std::make_shared<net::WebSocketConnection>(
ws, connInfo.protocol_version, m_logger);
m_clientImpl = std::make_unique<net::ClientImpl>(
m_loop.Now().count(), m_inst, *m_wire, m_logger, m_timeSyncUpdated,
m_loop.Now().count(), *m_wire, m_logger, m_timeSyncUpdated,
[this](uint32_t repeatMs) {
DEBUG4("Setting periodic timer to {}", repeatMs);
if (m_sendOutgoingTimer &&

View File

@@ -254,7 +254,7 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
m_websocket->binary.connect([this](std::span<const uint8_t> data, bool) {
while (!data.empty()) {
// decode message
int64_t pubuid;
int pubuid;
Value value;
std::string error;
if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) {

View File

@@ -28,12 +28,11 @@ using namespace nt;
using namespace nt::net;
ClientImpl::ClientImpl(
uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
uint64_t curTimeMs, WireConnection& wire, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic)
: m_inst{inst},
m_wire{wire},
: m_wire{wire},
m_logger{logger},
m_timeSyncUpdated{std::move(timeSyncUpdated)},
m_setPeriodic{std::move(setPeriodic)},
@@ -58,7 +57,7 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
}
// decode message
int64_t id;
int id;
Value value;
std::string error;
if (!WireDecodeBinary(&data, &id, &value, &error,
@@ -114,13 +113,13 @@ void ClientImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
for (auto&& elem : msgs) {
// common case is value
if (auto msg = std::get_if<ClientValueMsg>(&elem.contents)) {
SetValue(msg->pubHandle, msg->value);
SetValue(msg->pubuid, msg->value);
} else if (auto msg = std::get_if<PublishMsg>(&elem.contents)) {
Publish(msg->pubHandle, msg->topicHandle, msg->name, msg->typeStr,
msg->properties, msg->options);
m_outgoing.SendMessage(msg->pubHandle, std::move(elem));
Publish(msg->pubuid, msg->name, msg->typeStr, msg->properties,
msg->options);
m_outgoing.SendMessage(msg->pubuid, std::move(elem));
} else if (auto msg = std::get_if<UnpublishMsg>(&elem.contents)) {
Unpublish(msg->pubHandle, msg->topicHandle, std::move(elem));
Unpublish(msg->pubuid, std::move(elem));
} else {
m_outgoing.SendMessage(0, std::move(elem));
}
@@ -174,38 +173,33 @@ void ClientImpl::UpdatePeriodic() {
m_setPeriodic(m_periodMs);
}
void ClientImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
void ClientImpl::Publish(int32_t pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
m_publishers.resize(index + 1);
if (static_cast<uint32_t>(pubuid) >= m_publishers.size()) {
m_publishers.resize(pubuid + 1);
}
auto& publisher = m_publishers[index];
auto& publisher = m_publishers[pubuid];
if (!publisher) {
publisher = std::make_unique<PublisherData>();
}
publisher->handle = pubHandle;
publisher->options = options;
publisher->periodMs = std::lround(options.periodicMs / 10.0) * 10;
if (publisher->periodMs < kMinPeriodMs) {
publisher->periodMs = kMinPeriodMs;
}
m_outgoing.SetPeriod(pubHandle, publisher->periodMs);
m_outgoing.SetPeriod(pubuid, publisher->periodMs);
// update period
m_periodMs = UpdatePeriodCalc(m_periodMs, publisher->periodMs);
UpdatePeriodic();
}
void ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle,
ClientMessage&& msg) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
void ClientImpl::Unpublish(int32_t pubuid, ClientMessage&& msg) {
if (static_cast<uint32_t>(pubuid) >= m_publishers.size()) {
return;
}
m_publishers[index].reset();
m_publishers[pubuid].reset();
// loop over all publishers to update period
m_periodMs = kMaxPeriodMs;
@@ -216,40 +210,35 @@ void ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle,
}
UpdatePeriodic();
m_outgoing.SendMessage(pubHandle, std::move(msg));
m_outgoing.SendMessage(pubuid, std::move(msg));
// remove from outgoing handle map
m_outgoing.EraseHandle(pubHandle);
m_outgoing.EraseId(pubuid);
}
void ClientImpl::SetValue(NT_Publisher pubHandle, const Value& value) {
DEBUG4("SetValue({}, time={}, server_time={})", pubHandle, value.time(),
void ClientImpl::SetValue(int32_t pubuid, const Value& value) {
DEBUG4("SetValue({}, time={}, server_time={})", pubuid, value.time(),
value.server_time());
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size() || !m_publishers[index]) {
if (static_cast<uint32_t>(pubuid) >= m_publishers.size() ||
!m_publishers[pubuid]) {
return;
}
auto& publisher = *m_publishers[index];
auto& publisher = *m_publishers[pubuid];
m_outgoing.SendValue(
pubHandle, value,
pubuid, value,
publisher.options.sendAll ? ValueSendMode::kAll : ValueSendMode::kNormal);
}
void ClientImpl::ServerAnnounce(std::string_view name, int64_t id,
void ClientImpl::ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubuid) {
std::optional<int> pubuid) {
DEBUG4("ServerAnnounce({}, {}, {})", name, id, typeStr);
assert(m_local);
NT_Publisher pubHandle{0};
if (pubuid) {
pubHandle = Handle(m_inst, pubuid.value(), Handle::kPublisher);
}
m_topicMap[id] =
m_local->NetworkAnnounce(name, typeStr, properties, pubHandle);
m_topicMap[id] = m_local->NetworkAnnounce(name, typeStr, properties, pubuid);
}
void ClientImpl::ServerUnannounce(std::string_view name, int64_t id) {
void ClientImpl::ServerUnannounce(std::string_view name, int id) {
DEBUG4("ServerUnannounce({}, {})", name, id);
assert(m_local);
m_local->NetworkUnannounce(name);

View File

@@ -9,7 +9,6 @@
#include <functional>
#include <memory>
#include <span>
#include <string>
#include <string_view>
#include <vector>
@@ -39,7 +38,7 @@ class WireConnection;
class ClientImpl final : private ServerMessageHandler {
public:
ClientImpl(
uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
uint64_t curTimeMs, WireConnection& wire, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic);
@@ -55,7 +54,6 @@ class ClientImpl final : private ServerMessageHandler {
private:
struct PublisherData {
NT_Publisher handle;
PubSubOptionsImpl options;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
@@ -65,21 +63,18 @@ class ClientImpl final : private ServerMessageHandler {
void UpdatePeriodic();
// ServerMessageHandler interface
void ServerAnnounce(std::string_view name, int64_t id,
std::string_view typeStr, const wpi::json& properties,
std::optional<int64_t> pubuid) final;
void ServerUnannounce(std::string_view name, int64_t id) final;
void ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
void ServerUnannounce(std::string_view name, int id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
void Publish(int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle,
ClientMessage&& msg);
void SetValue(NT_Publisher pubHandle, const Value& value);
void Unpublish(int pubuid, ClientMessage&& msg);
void SetValue(int pubuid, const Value& value);
int m_inst;
WireConnection& m_wire;
wpi::Logger& m_logger;
LocalInterface* m_local{nullptr};
@@ -91,7 +86,7 @@ class ClientImpl final : private ServerMessageHandler {
std::vector<std::unique_ptr<PublisherData>> m_publishers;
// indexed by server-provided topic id
wpi::DenseMap<int64_t, NT_Topic> m_topicMap;
wpi::DenseMap<int, NT_Topic> m_topicMap;
// ping
NetworkPing m_ping;

View File

@@ -13,7 +13,6 @@
#include "PubSubOptions.h"
#include "networktables/NetworkTableValue.h"
#include "ntcore_c.h"
namespace nt::net {
@@ -24,8 +23,7 @@ namespace nt::net {
struct PublishMsg {
static constexpr std::string_view kMethodStr = "publish";
NT_Publisher pubHandle{0};
NT_Topic topicHandle{0}; // will be 0 when coming from network
int pubuid{0};
std::string name;
std::string typeStr;
wpi::json properties;
@@ -34,31 +32,29 @@ struct PublishMsg {
struct UnpublishMsg {
static constexpr std::string_view kMethodStr = "unpublish";
NT_Publisher pubHandle{0};
NT_Topic topicHandle{0}; // will be 0 when coming from network
int pubuid{0};
};
struct SetPropertiesMsg {
static constexpr std::string_view kMethodStr = "setproperties";
NT_Topic topicHandle{0}; // will be 0 when coming from network
std::string name;
wpi::json update;
};
struct SubscribeMsg {
static constexpr std::string_view kMethodStr = "subscribe";
NT_Subscriber subHandle{0};
int subuid{0};
std::vector<std::string> topicNames;
PubSubOptionsImpl options;
};
struct UnsubscribeMsg {
static constexpr std::string_view kMethodStr = "unsubscribe";
NT_Subscriber subHandle{0};
int subuid{0};
};
struct ClientValueMsg {
NT_Publisher pubHandle{0};
int pubuid{0};
Value value;
};
@@ -77,16 +73,16 @@ struct ClientMessage {
struct AnnounceMsg {
static constexpr std::string_view kMethodStr = "announce";
std::string name;
int64_t id{0};
int id{0};
std::string typeStr;
std::optional<int64_t> pubuid;
std::optional<int> pubuid;
wpi::json properties;
};
struct UnannounceMsg {
static constexpr std::string_view kMethodStr = "unannounce";
std::string name;
int64_t id{0};
int id{0};
};
struct PropertiesUpdateMsg {
@@ -97,7 +93,7 @@ struct PropertiesUpdateMsg {
};
struct ServerValueMsg {
NT_Topic topic{0};
int topic{0};
Value value;
};

View File

@@ -4,6 +4,8 @@
#pragma once
#include <stdint.h>
#include <span>
#include <string>
#include <string_view>
@@ -26,7 +28,7 @@ class LocalInterface {
virtual NT_Topic NetworkAnnounce(std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
NT_Publisher pubHandle) = 0;
std::optional<int> pubuid) = 0;
virtual void NetworkUnannounce(std::string_view name) = 0;
virtual void NetworkPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) = 0;
@@ -37,18 +39,16 @@ class NetworkInterface {
public:
virtual ~NetworkInterface() = default;
virtual void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
virtual void Publish(int pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) = 0;
virtual void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) = 0;
virtual void SetProperties(NT_Topic topicHandle, std::string_view name,
virtual void Unpublish(int pubuid) = 0;
virtual void SetProperties(std::string_view name,
const wpi::json& update) = 0;
virtual void Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
virtual void Subscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void Unsubscribe(NT_Subscriber subHandle) = 0;
virtual void SetValue(NT_Publisher pubHandle, const Value& value) = 0;
virtual void Unsubscribe(int subuid) = 0;
virtual void SetValue(int pubuid, const Value& value) = 0;
};
class ILocalStorage : public LocalInterface {

View File

@@ -10,7 +10,7 @@ using namespace nt::net;
static constexpr size_t kMaxSize = 2 * 1024 * 1024;
void NetworkLoopQueue::SetValue(NT_Publisher pubHandle, const Value& value) {
void NetworkLoopQueue::SetValue(int pubuid, const Value& value) {
std::scoped_lock lock{m_mutex};
m_size += sizeof(ClientMessage) + value.size();
if (m_size > kMaxSize) {
@@ -20,5 +20,5 @@ void NetworkLoopQueue::SetValue(NT_Publisher pubHandle, const Value& value) {
}
return; // avoid potential out of memory
}
m_queue.emplace_back(ClientMessage{ClientValueMsg{pubHandle, value}});
m_queue.emplace_back(ClientMessage{ClientValueMsg{pubuid, value}});
}

View File

@@ -31,18 +31,15 @@ class NetworkLoopQueue : public NetworkInterface {
void ClearQueue();
// NetworkInterface - calls to these append to the queue
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
void Publish(int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) final;
void SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update) final;
void Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
void Unpublish(int pubuid) final;
void SetProperties(std::string_view name, const wpi::json& update) final;
void Subscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void Unsubscribe(NT_Subscriber subHandle) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;
void Unsubscribe(int subuid) final;
void SetValue(int pubuid, const Value& value) final;
private:
wpi::mutex m_mutex;

View File

@@ -29,43 +29,38 @@ inline void NetworkLoopQueue::ClearQueue() {
m_sizeErrored = false;
}
inline void NetworkLoopQueue::Publish(NT_Publisher pubHandle,
NT_Topic topicHandle,
std::string_view name,
inline void NetworkLoopQueue::Publish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(
ClientMessage{PublishMsg{pubHandle, topicHandle, std::string{name},
std::string{typeStr}, properties, options}});
m_queue.emplace_back(ClientMessage{PublishMsg{
pubuid, std::string{name}, std::string{typeStr}, properties, options}});
}
inline void NetworkLoopQueue::Unpublish(NT_Publisher pubHandle,
NT_Topic topicHandle) {
inline void NetworkLoopQueue::Unpublish(int pubuid) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{UnpublishMsg{pubHandle, topicHandle}});
m_queue.emplace_back(ClientMessage{UnpublishMsg{pubuid}});
}
inline void NetworkLoopQueue::SetProperties(NT_Topic topicHandle,
std::string_view name,
inline void NetworkLoopQueue::SetProperties(std::string_view name,
const wpi::json& update) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(
ClientMessage{SetPropertiesMsg{topicHandle, std::string{name}, update}});
ClientMessage{SetPropertiesMsg{std::string{name}, update}});
}
inline void NetworkLoopQueue::Subscribe(NT_Subscriber subHandle,
inline void NetworkLoopQueue::Subscribe(int subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{SubscribeMsg{
subHandle, {topicNames.begin(), topicNames.end()}, options}});
m_queue.emplace_back(ClientMessage{
SubscribeMsg{subuid, {topicNames.begin(), topicNames.end()}, options}});
}
inline void NetworkLoopQueue::Unsubscribe(NT_Subscriber subHandle) {
inline void NetworkLoopQueue::Unsubscribe(int subuid) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{UnsubscribeMsg{subHandle}});
m_queue.emplace_back(ClientMessage{UnsubscribeMsg{subuid}});
}
} // namespace nt::net

View File

@@ -9,14 +9,12 @@
#include <algorithm>
#include <concepts>
#include <numeric>
#include <optional>
#include <span>
#include <utility>
#include <vector>
#include <wpi/DenseMap.h>
#include "Handle.h"
#include "Message.h"
#include "WireConnection.h"
#include "WireEncoder.h"
@@ -71,18 +69,17 @@ class NetworkOutgoingQueue {
m_queues.emplace_back(100); // default queue is 100 ms period
}
void SetPeriod(NT_Handle handle, uint32_t periodMs);
void SetPeriod(int id, uint32_t periodMs);
void EraseHandle(NT_Handle handle) { m_handleMap.erase(handle); }
void EraseId(int id) { m_idMap.erase(id); }
template <typename T>
void SendMessage(NT_Handle handle, T&& msg) {
m_queues[m_handleMap[handle].queueIndex].Append(handle,
std::forward<T>(msg));
void SendMessage(int id, T&& msg) {
m_queues[m_idMap[id].queueIndex].Append(id, std::forward<T>(msg));
m_totalSize += sizeof(Message);
}
void SendValue(NT_Handle handle, const Value& value, ValueSendMode mode);
void SendValue(int id, const Value& value, ValueSendMode mode);
void SendOutgoing(uint64_t curTimeMs, bool flush);
@@ -95,16 +92,15 @@ class NetworkOutgoingQueue {
private:
using ValueMsg = typename MessageType::ValueMsg;
void EncodeValue(wpi::raw_ostream& os, NT_Handle handle, const Value& value);
void EncodeValue(wpi::raw_ostream& os, int id, const Value& value);
struct Message {
Message() = default;
template <typename T>
Message(T&& msg, NT_Handle handle)
: msg{std::forward<T>(msg)}, handle{handle} {}
Message(T&& msg, int id) : msg{std::forward<T>(msg)}, id{id} {}
MessageType msg;
NT_Handle handle;
int id;
};
struct Queue {
@@ -124,7 +120,7 @@ class NetworkOutgoingQueue {
unsigned int queueIndex = 0;
int valuePos = -1; // -1 if not in queue
};
wpi::DenseMap<NT_Handle, HandleInfo> m_handleMap;
wpi::DenseMap<int, HandleInfo> m_idMap;
size_t m_totalSize{0};
uint64_t m_lastSendMs{0};
int64_t m_timeOffsetUs{0};
@@ -137,8 +133,7 @@ class NetworkOutgoingQueue {
};
template <NetworkMessage MessageType>
void NetworkOutgoingQueue<MessageType>::SetPeriod(NT_Handle handle,
uint32_t periodMs) {
void NetworkOutgoingQueue<MessageType>::SetPeriod(int id, uint32_t periodMs) {
// it's quite common to set a lot of things in a row with the same period
unsigned int queueIndex;
if (m_lastSetPeriod == periodMs) {
@@ -159,13 +154,12 @@ void NetworkOutgoingQueue<MessageType>::SetPeriod(NT_Handle handle,
}
// map the handle to the queue
auto [infoIt, created] = m_handleMap.try_emplace(handle);
auto [infoIt, created] = m_idMap.try_emplace(id);
if (!created && infoIt->getSecond().queueIndex != queueIndex) {
// need to move any items from old queue to new queue
auto& oldMsgs = m_queues[infoIt->getSecond().queueIndex].msgs;
auto it = std::stable_partition(
oldMsgs.begin(), oldMsgs.end(),
[&](const auto& e) { return e.handle != handle; });
auto it = std::stable_partition(oldMsgs.begin(), oldMsgs.end(),
[&](const auto& e) { return e.id != id; });
auto& newMsgs = m_queues[queueIndex].msgs;
for (auto i = it, end = oldMsgs.end(); i != end; ++i) {
newMsgs.emplace_back(std::move(*i));
@@ -177,8 +171,7 @@ void NetworkOutgoingQueue<MessageType>::SetPeriod(NT_Handle handle,
}
template <NetworkMessage MessageType>
void NetworkOutgoingQueue<MessageType>::SendValue(NT_Handle handle,
const Value& value,
void NetworkOutgoingQueue<MessageType>::SendValue(int id, const Value& value,
ValueSendMode mode) {
if (m_local) {
mode = ValueSendMode::kImm; // always send local immediately
@@ -191,26 +184,26 @@ void NetworkOutgoingQueue<MessageType>::SendValue(NT_Handle handle,
case ValueSendMode::kDisabled: // do nothing
break;
case ValueSendMode::kImm: // send immediately
m_wire.SendBinary([&](auto& os) { EncodeValue(os, handle, value); });
m_wire.SendBinary([&](auto& os) { EncodeValue(os, id, value); });
break;
case ValueSendMode::kAll: { // append to outgoing
auto& info = m_handleMap[handle];
auto& info = m_idMap[id];
auto& queue = m_queues[info.queueIndex];
info.valuePos = queue.msgs.size();
queue.Append(handle, ValueMsg{handle, value});
queue.Append(id, ValueMsg{id, value});
m_totalSize += sizeof(Message) + value.size();
break;
}
case ValueSendMode::kNormal: {
// replace, or append if not present
auto& info = m_handleMap[handle];
auto& info = m_idMap[id];
auto& queue = m_queues[info.queueIndex];
if (info.valuePos != -1 &&
static_cast<unsigned int>(info.valuePos) < queue.msgs.size()) {
auto& elem = queue.msgs[info.valuePos];
if (auto m = std::get_if<ValueMsg>(&elem.msg.contents)) {
// double-check handle, and only replace if timestamp newer
if (elem.handle == handle &&
if (elem.id == id &&
(m->value.time() == 0 || value.time() >= m->value.time())) {
int delta = value.size() - m->value.size();
m->value = value;
@@ -220,7 +213,7 @@ void NetworkOutgoingQueue<MessageType>::SendValue(NT_Handle handle,
}
}
info.valuePos = queue.msgs.size();
queue.Append(handle, ValueMsg{handle, value});
queue.Append(id, ValueMsg{id, value});
m_totalSize += sizeof(Message) + value.size();
break;
}
@@ -271,7 +264,7 @@ void NetworkOutgoingQueue<MessageType>::SendOutgoing(uint64_t curTimeMs,
for (; it != end && unsent == 0; ++it) {
if (auto m = std::get_if<ValueMsg>(&it->msg.contents)) {
unsent = m_wire.WriteBinary(
[&](auto& os) { EncodeValue(os, it->handle, m->value); });
[&](auto& os) { EncodeValue(os, it->id, m->value); });
} else {
unsent = m_wire.WriteText([&](auto& os) {
if (!WireEncodeText(os, it->msg)) {
@@ -299,7 +292,7 @@ void NetworkOutgoingQueue<MessageType>::SendOutgoing(uint64_t curTimeMs,
}
}
msgs.erase(msgs.begin(), it - unsent);
for (auto&& kv : m_handleMap) {
for (auto&& kv : m_idMap) {
auto& info = kv.getSecond();
if (info.queueIndex == queueIndex) {
if (info.valuePos < delta) {
@@ -324,7 +317,7 @@ void NetworkOutgoingQueue<MessageType>::SendOutgoing(uint64_t curTimeMs,
template <NetworkMessage MessageType>
void NetworkOutgoingQueue<MessageType>::EncodeValue(wpi::raw_ostream& os,
NT_Handle handle,
int id,
const Value& value) {
int64_t time = value.time();
if constexpr (std::same_as<ValueMsg, ClientValueMsg>) {
@@ -336,7 +329,7 @@ void NetworkOutgoingQueue<MessageType>::EncodeValue(wpi::raw_ostream& os,
}
}
}
WireEncodeBinary(os, Handle{handle}.GetIndex(), time, value);
WireEncodeBinary(os, id, time, value);
}
} // namespace nt::net

View File

@@ -199,7 +199,7 @@ std::span<ServerImpl::SubscriberData*> ServerImpl::ClientData::GetSubscribers(
return {buf.data(), buf.size()};
}
void ServerImpl::ClientData4Base::ClientPublish(int64_t pubuid,
void ServerImpl::ClientData4Base::ClientPublish(int pubuid,
std::string_view name,
std::string_view typeStr,
const wpi::json& properties) {
@@ -224,7 +224,7 @@ void ServerImpl::ClientData4Base::ClientPublish(int64_t pubuid,
SendAnnounce(topic, pubuid);
}
void ServerImpl::ClientData4Base::ClientUnpublish(int64_t pubuid) {
void ServerImpl::ClientData4Base::ClientUnpublish(int pubuid) {
DEBUG3("ClientUnpublish({}, {})", m_id, pubuid);
auto publisherIt = m_publishers.find(pubuid);
if (publisherIt == m_publishers.end()) {
@@ -270,7 +270,7 @@ void ServerImpl::ClientData4Base::ClientSetProperties(std::string_view name,
}
void ServerImpl::ClientData4Base::ClientSubscribe(
int64_t subuid, std::span<const std::string> topicNames,
int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
DEBUG4("ClientSubscribe({}, ({}), {})", m_id, fmt::join(topicNames, ","),
subuid);
@@ -347,7 +347,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe(
}
}
void ServerImpl::ClientData4Base::ClientUnsubscribe(int64_t subuid) {
void ServerImpl::ClientData4Base::ClientUnsubscribe(int subuid) {
DEBUG3("ClientUnsubscribe({}, {})", m_id, subuid);
auto subIt = m_subscribers.find(subuid);
if (subIt == m_subscribers.end() || !subIt->getSecond()) {
@@ -377,7 +377,7 @@ void ServerImpl::ClientData4Base::ClientUnsubscribe(int64_t subuid) {
}
}
void ServerImpl::ClientData4Base::ClientSetValue(int64_t pubuid,
void ServerImpl::ClientData4Base::ClientSetValue(int pubuid,
const Value& value) {
DEBUG4("ClientSetValue({}, {})", m_id, pubuid);
auto publisherIt = m_publishers.find(pubuid);
@@ -398,7 +398,7 @@ void ServerImpl::ClientDataLocal::SendValue(TopicData* topic,
}
void ServerImpl::ClientDataLocal::SendAnnounce(TopicData* topic,
std::optional<int64_t> pubuid) {
std::optional<int> pubuid) {
if (m_server.m_local) {
auto& sent = m_announceSent[topic];
if (sent) {
@@ -407,7 +407,7 @@ void ServerImpl::ClientDataLocal::SendAnnounce(TopicData* topic,
sent = true;
topic->localHandle = m_server.m_local->NetworkAnnounce(
topic->name, topic->typeStr, topic->properties, pubuid.value_or(0));
topic->name, topic->typeStr, topic->properties, pubuid);
}
}
@@ -445,20 +445,20 @@ void ServerImpl::ClientDataLocal::HandleLocal(
for (const auto& elem : msgs) { // NOLINT
// common case is value, so check that first
if (auto msg = std::get_if<ClientValueMsg>(&elem.contents)) {
ClientSetValue(msg->pubHandle, msg->value);
ClientSetValue(msg->pubuid, msg->value);
} else if (auto msg = std::get_if<PublishMsg>(&elem.contents)) {
ClientPublish(msg->pubHandle, msg->name, msg->typeStr, msg->properties);
ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties);
updatepub = true;
} else if (auto msg = std::get_if<UnpublishMsg>(&elem.contents)) {
ClientUnpublish(msg->pubHandle);
ClientUnpublish(msg->pubuid);
updatepub = true;
} else if (auto msg = std::get_if<SetPropertiesMsg>(&elem.contents)) {
ClientSetProperties(msg->name, msg->update);
} else if (auto msg = std::get_if<SubscribeMsg>(&elem.contents)) {
ClientSubscribe(msg->subHandle, msg->topicNames, msg->options);
ClientSubscribe(msg->subuid, msg->topicNames, msg->options);
updatesub = true;
} else if (auto msg = std::get_if<UnsubscribeMsg>(&elem.contents)) {
ClientUnsubscribe(msg->subHandle);
ClientUnsubscribe(msg->subuid);
updatesub = true;
}
}
@@ -485,7 +485,7 @@ void ServerImpl::ClientData4::ProcessIncomingBinary(
}
// decode message
int64_t pubuid;
int pubuid;
Value value;
std::string error;
if (!WireDecodeBinary(&data, &pubuid, &value, &error, 0)) {
@@ -509,11 +509,11 @@ void ServerImpl::ClientData4::ProcessIncomingBinary(
void ServerImpl::ClientData4::SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) {
m_outgoing.SendValue(topic->GetIdHandle(), value, mode);
m_outgoing.SendValue(topic->id, value, mode);
}
void ServerImpl::ClientData4::SendAnnounce(TopicData* topic,
std::optional<int64_t> pubuid) {
std::optional<int> pubuid) {
auto& sent = m_announceSent[topic];
if (sent) {
return;
@@ -532,9 +532,9 @@ void ServerImpl::ClientData4::SendAnnounce(TopicData* topic,
return;
}
}
m_outgoing.SendMessage(topic->GetIdHandle(),
AnnounceMsg{topic->name, topic->id, topic->typeStr,
pubuid, topic->properties});
m_outgoing.SendMessage(
topic->id, AnnounceMsg{topic->name, static_cast<int>(topic->id),
topic->typeStr, pubuid, topic->properties});
m_server.m_controlReady = true;
}
@@ -555,9 +555,9 @@ void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) {
return;
}
}
m_outgoing.SendMessage(topic->GetIdHandle(),
UnannounceMsg{topic->name, topic->id});
m_outgoing.EraseHandle(topic->GetIdHandle());
m_outgoing.SendMessage(
topic->id, UnannounceMsg{topic->name, static_cast<int>(topic->id)});
m_outgoing.EraseId(topic->id);
m_server.m_controlReady = true;
}
@@ -579,7 +579,7 @@ void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic,
return;
}
}
m_outgoing.SendMessage(topic->GetIdHandle(),
m_outgoing.SendMessage(topic->id,
PropertiesUpdateMsg{topic->name, update, ack});
m_server.m_controlReady = true;
}
@@ -598,7 +598,7 @@ void ServerImpl::ClientData4::UpdatePeriod(TopicData::TopicClientData& tcd,
uint32_t period =
CalculatePeriod(tcd.subscribers, [](auto& x) { return x->periodMs; });
DEBUG4("updating {} period to {} ms", topic->name, period);
m_outgoing.SetPeriod(topic->GetIdHandle(), period);
m_outgoing.SetPeriod(topic->id, period);
}
bool ServerImpl::ClientData3::TopicData3::UpdateFlags(TopicData* topic) {
@@ -681,7 +681,7 @@ void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value,
}
void ServerImpl::ClientData3::SendAnnounce(TopicData* topic,
std::optional<int64_t> pubuid) {
std::optional<int> pubuid) {
// ignore if we've not yet built the subscriber
if (m_subscribers.empty()) {
return;

View File

@@ -21,14 +21,12 @@
#include <wpi/UidVector.h>
#include <wpi/json.h>
#include "Handle.h"
#include "Log.h"
#include "Message.h"
#include "NetworkInterface.h"
#include "NetworkOutgoingQueue.h"
#include "NetworkPing.h"
#include "PubSubOptions.h"
#include "VectorSet.h"
#include "WireConnection.h"
#include "WireDecoder.h"
#include "WireEncoder.h"
@@ -119,8 +117,6 @@ class ServerImpl final {
void RefreshProperties();
bool SetFlags(unsigned int flags_);
NT_Handle GetIdHandle() const { return Handle(0, id, Handle::kTopic); }
wpi::Logger& m_logger; // Must be m_logger for WARN macro to work
std::string name;
unsigned int id;
@@ -190,8 +186,7 @@ class ServerImpl final {
virtual void SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) = 0;
virtual void SendAnnounce(TopicData* topic,
std::optional<int64_t> pubuid) = 0;
virtual void SendAnnounce(TopicData* topic, std::optional<int> pubuid) = 0;
virtual void SendUnannounce(TopicData* topic) = 0;
virtual void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) = 0;
@@ -223,8 +218,8 @@ class ServerImpl final {
wpi::Logger& m_logger;
wpi::DenseMap<int64_t, std::unique_ptr<PublisherData>> m_publishers;
wpi::DenseMap<int64_t, std::unique_ptr<SubscriberData>> m_subscribers;
wpi::DenseMap<int, std::unique_ptr<PublisherData>> m_publishers;
wpi::DenseMap<int, std::unique_ptr<SubscriberData>> m_subscribers;
public:
// meta topics
@@ -241,18 +236,17 @@ class ServerImpl final {
protected:
// ClientMessageHandler interface
void ClientPublish(int64_t pubuid, std::string_view name,
void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties) final;
void ClientUnpublish(int64_t pubuid) final;
void ClientUnpublish(int pubuid) final;
void ClientSetProperties(std::string_view name,
const wpi::json& update) final;
void ClientSubscribe(int64_t subuid,
std::span<const std::string> topicNames,
void ClientSubscribe(int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void ClientUnsubscribe(int64_t subuid) final;
void ClientUnsubscribe(int subuid) final;
void ClientSetValue(int64_t pubuid, const Value& value);
void ClientSetValue(int pubuid, const Value& value);
wpi::DenseMap<TopicData*, bool> m_announceSent;
};
@@ -267,7 +261,7 @@ class ServerImpl final {
void SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int64_t> pubuid) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;
@@ -293,7 +287,7 @@ class ServerImpl final {
void SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int64_t> pubuid) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;
@@ -328,7 +322,7 @@ class ServerImpl final {
void SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int64_t> pubuid) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack) final;

View File

@@ -430,14 +430,14 @@ void nt::net::WireDecodeText(std::string_view in, ServerMessageHandler& out,
::WireDecodeTextImpl(in, out, logger);
}
bool nt::net::WireDecodeBinary(std::span<const uint8_t>* in, int64_t* outId,
bool nt::net::WireDecodeBinary(std::span<const uint8_t>* in, int* outId,
Value* outValue, std::string* error,
int64_t localTimeOffset) {
mpack_reader_t reader;
mpack_reader_init_data(&reader, reinterpret_cast<const char*>(in->data()),
in->size());
mpack_expect_array_match(&reader, 4);
*outId = mpack_expect_i64(&reader);
*outId = mpack_expect_int(&reader);
auto time = mpack_expect_i64(&reader);
int type = mpack_expect_int(&reader);
switch (type) {

View File

@@ -28,26 +28,26 @@ class ClientMessageHandler {
public:
virtual ~ClientMessageHandler() = default;
virtual void ClientPublish(int64_t pubuid, std::string_view name,
virtual void ClientPublish(int pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties) = 0;
virtual void ClientUnpublish(int64_t pubuid) = 0;
virtual void ClientUnpublish(int pubuid) = 0;
virtual void ClientSetProperties(std::string_view name,
const wpi::json& update) = 0;
virtual void ClientSubscribe(int64_t subuid,
virtual void ClientSubscribe(int subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void ClientUnsubscribe(int64_t subuid) = 0;
virtual void ClientUnsubscribe(int subuid) = 0;
};
class ServerMessageHandler {
public:
virtual ~ServerMessageHandler() = default;
virtual void ServerAnnounce(std::string_view name, int64_t id,
virtual void ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubuid) = 0;
virtual void ServerUnannounce(std::string_view name, int64_t id) = 0;
std::optional<int> pubuid) = 0;
virtual void ServerUnannounce(std::string_view name, int id) = 0;
virtual void ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) = 0;
};
@@ -59,8 +59,7 @@ void WireDecodeText(std::string_view in, ServerMessageHandler& out,
wpi::Logger& logger);
// returns true if successfully decoded a message
bool WireDecodeBinary(std::span<const uint8_t>* in, int64_t* outId,
Value* outValue, std::string* error,
int64_t localTimeOffset);
bool WireDecodeBinary(std::span<const uint8_t>* in, int* outId, Value* outValue,
std::string* error, int64_t localTimeOffset);
} // namespace nt::net

View File

@@ -11,7 +11,6 @@
#include <wpi/mpack.h>
#include <wpi/raw_ostream.h>
#include "Handle.h"
#include "Message.h"
#include "PubSubOptions.h"
#include "networktables/NetworkTableValue.h"
@@ -20,7 +19,7 @@ using namespace nt;
using namespace nt::net;
using namespace mpack;
void nt::net::WireEncodePublish(wpi::raw_ostream& os, int64_t pubuid,
void nt::net::WireEncodePublish(wpi::raw_ostream& os, int pubuid,
std::string_view name, std::string_view typeStr,
const wpi::json& properties) {
wpi::json::serializer s{os, ' ', 0};
@@ -36,7 +35,7 @@ void nt::net::WireEncodePublish(wpi::raw_ostream& os, int64_t pubuid,
os << "\"}}";
}
void nt::net::WireEncodeUnpublish(wpi::raw_ostream& os, int64_t pubuid) {
void nt::net::WireEncodeUnpublish(wpi::raw_ostream& os, int pubuid) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << UnpublishMsg::kMethodStr << "\",\"params\":{";
os << "\"pubuid\":";
@@ -75,7 +74,7 @@ static void EncodePrefixes(wpi::raw_ostream& os, std::span<const T> topicNames,
}
template <typename T>
static void WireEncodeSubscribeImpl(wpi::raw_ostream& os, int64_t subuid,
static void WireEncodeSubscribeImpl(wpi::raw_ostream& os, int subuid,
std::span<const T> topicNames,
const PubSubOptionsImpl& options) {
wpi::json::serializer s{os, ' ', 0};
@@ -114,38 +113,37 @@ static void WireEncodeSubscribeImpl(wpi::raw_ostream& os, int64_t subuid,
os << "}}";
}
void nt::net::WireEncodeSubscribe(wpi::raw_ostream& os, int64_t subuid,
void nt::net::WireEncodeSubscribe(wpi::raw_ostream& os, int subuid,
std::span<const std::string_view> topicNames,
const PubSubOptionsImpl& options) {
WireEncodeSubscribeImpl(os, subuid, topicNames, options);
}
void nt::net::WireEncodeSubscribe(wpi::raw_ostream& os, int64_t subuid,
void nt::net::WireEncodeSubscribe(wpi::raw_ostream& os, int subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
WireEncodeSubscribeImpl(os, subuid, topicNames, options);
}
void nt::net::WireEncodeUnsubscribe(wpi::raw_ostream& os, int64_t subHandle) {
void nt::net::WireEncodeUnsubscribe(wpi::raw_ostream& os, int subuid) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << UnsubscribeMsg::kMethodStr << "\",\"params\":{";
os << "\"subuid\":";
s.dump_integer(subHandle);
s.dump_integer(subuid);
os << "}}";
}
bool nt::net::WireEncodeText(wpi::raw_ostream& os, const ClientMessage& msg) {
if (auto m = std::get_if<PublishMsg>(&msg.contents)) {
WireEncodePublish(os, Handle{m->pubHandle}.GetIndex(), m->name, m->typeStr,
m->properties);
WireEncodePublish(os, m->pubuid, m->name, m->typeStr, m->properties);
} else if (auto m = std::get_if<UnpublishMsg>(&msg.contents)) {
WireEncodeUnpublish(os, Handle{m->pubHandle}.GetIndex());
WireEncodeUnpublish(os, m->pubuid);
} else if (auto m = std::get_if<SetPropertiesMsg>(&msg.contents)) {
WireEncodeSetProperties(os, m->name, m->update);
} else if (auto m = std::get_if<SubscribeMsg>(&msg.contents)) {
WireEncodeSubscribe(os, m->subHandle, m->topicNames, m->options);
WireEncodeSubscribe(os, m->subuid, m->topicNames, m->options);
} else if (auto m = std::get_if<UnsubscribeMsg>(&msg.contents)) {
WireEncodeUnsubscribe(os, m->subHandle);
WireEncodeUnsubscribe(os, m->subuid);
} else {
return false;
}
@@ -153,9 +151,9 @@ bool nt::net::WireEncodeText(wpi::raw_ostream& os, const ClientMessage& msg) {
}
void nt::net::WireEncodeAnnounce(wpi::raw_ostream& os, std::string_view name,
int64_t id, std::string_view typeStr,
int id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubHandle) {
std::optional<int> pubuid) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << AnnounceMsg::kMethodStr << "\",\"params\":{";
os << "\"id\":";
@@ -164,9 +162,9 @@ void nt::net::WireEncodeAnnounce(wpi::raw_ostream& os, std::string_view name,
s.dump_escaped(name, false);
os << "\",\"properties\":";
s.dump(properties, false, false, 0, 0);
if (pubHandle) {
if (pubuid) {
os << ",\"pubuid\":";
s.dump_integer(*pubHandle);
s.dump_integer(*pubuid);
}
os << ",\"type\":\"";
s.dump_escaped(typeStr, false);
@@ -214,7 +212,7 @@ bool nt::net::WireEncodeText(wpi::raw_ostream& os, const ServerMessage& msg) {
return true;
}
bool nt::net::WireEncodeBinary(wpi::raw_ostream& os, int64_t id, int64_t time,
bool nt::net::WireEncodeBinary(wpi::raw_ostream& os, int id, int64_t time,
const Value& value) {
char buf[128];
mpack_writer_t writer;

View File

@@ -26,24 +26,23 @@ struct ClientMessage;
struct ServerMessage;
// encoders for client text messages (avoids need to construct a Message struct)
void WireEncodePublish(wpi::raw_ostream& os, int64_t pubuid,
std::string_view name, std::string_view typeStr,
const wpi::json& properties);
void WireEncodeUnpublish(wpi::raw_ostream& os, int64_t pubuid);
void WireEncodePublish(wpi::raw_ostream& os, int pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties);
void WireEncodeUnpublish(wpi::raw_ostream& os, int pubuid);
void WireEncodeSetProperties(wpi::raw_ostream& os, std::string_view name,
const wpi::json& update);
void WireEncodeSubscribe(wpi::raw_ostream& os, int64_t subuid,
void WireEncodeSubscribe(wpi::raw_ostream& os, int subuid,
std::span<const std::string_view> topicNames,
const PubSubOptionsImpl& options);
void WireEncodeSubscribe(wpi::raw_ostream& os, int64_t subuid,
void WireEncodeSubscribe(wpi::raw_ostream& os, int subuid,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options);
void WireEncodeUnsubscribe(wpi::raw_ostream& os, int64_t subuid);
void WireEncodeUnsubscribe(wpi::raw_ostream& os, int subuid);
// encoders for server text messages (avoids need to construct a Message struct)
void WireEncodeAnnounce(wpi::raw_ostream& os, std::string_view name, int64_t id,
void WireEncodeAnnounce(wpi::raw_ostream& os, std::string_view name, int id,
std::string_view typeStr, const wpi::json& properties,
std::optional<int64_t> pubuid);
std::optional<int> pubuid);
void WireEncodeUnannounce(wpi::raw_ostream& os, std::string_view name,
int64_t id);
void WireEncodePropertiesUpdate(wpi::raw_ostream& os, std::string_view name,
@@ -56,7 +55,7 @@ bool WireEncodeText(wpi::raw_ostream& os, const ClientMessage& msg);
bool WireEncodeText(wpi::raw_ostream& os, const ServerMessage& msg);
// encoder for binary messages
bool WireEncodeBinary(wpi::raw_ostream& os, int64_t id, int64_t time,
bool WireEncodeBinary(wpi::raw_ostream& os, int id, int64_t time,
const Value& value);
} // namespace nt::net

View File

@@ -16,7 +16,6 @@
#include <wpi/StringMap.h>
#include <wpi/timestamp.h>
#include "Handle.h"
#include "Log.h"
#include "Types_internal.h"
#include "net/Message.h"
@@ -73,14 +72,14 @@ void ClientImpl3::HandleLocal(std::span<const net::ClientMessage> msgs) {
for (const auto& elem : msgs) { // NOLINT
// common case is value
if (auto msg = std::get_if<net::ClientValueMsg>(&elem.contents)) {
SetValue(msg->pubHandle, msg->value);
SetValue(msg->pubuid, msg->value);
} else if (auto msg = std::get_if<net::PublishMsg>(&elem.contents)) {
Publish(msg->pubHandle, msg->topicHandle, msg->name, msg->typeStr,
msg->properties, msg->options);
Publish(msg->pubuid, msg->name, msg->typeStr, msg->properties,
msg->options);
} else if (auto msg = std::get_if<net::UnpublishMsg>(&elem.contents)) {
Unpublish(msg->pubHandle, msg->topicHandle);
Unpublish(msg->pubuid);
} else if (auto msg = std::get_if<net::SetPropertiesMsg>(&elem.contents)) {
SetProperties(msg->topicHandle, msg->name, msg->update);
SetProperties(msg->name, msg->update);
}
}
}
@@ -176,23 +175,20 @@ bool ClientImpl3::CheckNetworkReady(uint64_t curTimeMs) {
return true;
}
void ClientImpl3::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
void ClientImpl3::Publish(int pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) {
DEBUG4("Publish('{}', '{}')", name, typeStr);
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
m_publishers.resize(index + 1);
if (static_cast<unsigned int>(pubuid) >= m_publishers.size()) {
m_publishers.resize(pubuid + 1);
}
auto& publisher = m_publishers[index];
auto& publisher = m_publishers[pubuid];
if (!publisher) {
publisher = std::make_unique<PublisherData>(GetOrNewEntry(name));
publisher->entry->typeStr = typeStr;
publisher->entry->type = StringToType3(typeStr);
publisher->entry->publishers.emplace_back(publisher.get());
}
publisher->handle = pubHandle;
publisher->options = options;
publisher->periodMs = std::lround(options.periodicMs / 10.0) * 10;
if (publisher->periodMs < 10) {
@@ -204,13 +200,12 @@ void ClientImpl3::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
m_setPeriodic(m_periodMs);
}
void ClientImpl3::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
DEBUG4("Unpublish({}, {})", pubHandle, topicHandle);
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
void ClientImpl3::Unpublish(int pubuid) {
DEBUG4("Unpublish({})", pubuid);
if (static_cast<unsigned int>(pubuid) >= m_publishers.size()) {
return;
}
auto& publisher = m_publishers[index];
auto& publisher = m_publishers[pubuid];
publisher->entry->publishers.erase(
std::remove(publisher->entry->publishers.begin(),
publisher->entry->publishers.end(), publisher.get()),
@@ -227,9 +222,9 @@ void ClientImpl3::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
m_setPeriodic(m_periodMs);
}
void ClientImpl3::SetProperties(NT_Topic topicHandle, std::string_view name,
void ClientImpl3::SetProperties(std::string_view name,
const wpi::json& update) {
DEBUG4("SetProperties({}, {}, {})", topicHandle, name, update.dump());
DEBUG4("SetProperties({}, {})", name, update.dump());
auto entry = GetOrNewEntry(name);
bool updated = false;
for (auto&& elem : update.items()) {
@@ -250,11 +245,11 @@ void ClientImpl3::SetProperties(NT_Topic topicHandle, std::string_view name,
}
}
void ClientImpl3::SetValue(NT_Publisher pubHandle, const Value& value) {
DEBUG4("SetValue({})", pubHandle);
unsigned int index = Handle{pubHandle}.GetIndex();
assert(index < m_publishers.size() && m_publishers[index]);
auto& publisher = *m_publishers[index];
void ClientImpl3::SetValue(int pubuid, const Value& value) {
DEBUG4("SetValue({})", pubuid);
assert(static_cast<unsigned int>(pubuid) < m_publishers.size() &&
m_publishers[pubuid]);
auto& publisher = *m_publishers[pubuid];
if (value == publisher.entry->value) {
return;
}
@@ -367,8 +362,8 @@ void ClientImpl3::EntryAssign(std::string_view name, unsigned int id,
// XXX: need to handle type change specially? (e.g. with unannounce)
if (entry->topic == 0 || flagsChanged || typeChanged) {
DEBUG4("NetworkAnnounce({}, {})", name, entry->typeStr);
entry->topic =
m_local->NetworkAnnounce(name, entry->typeStr, entry->properties, 0);
entry->topic = m_local->NetworkAnnounce(name, entry->typeStr,
entry->properties, std::nullopt);
}
if (valueChanged) {
m_local->NetworkSetValue(entry->topic, entry->value);

View File

@@ -61,7 +61,6 @@ class ClientImpl3 final : private MessageHandler3 {
explicit PublisherData(Entry* entry) : entry{entry} {}
Entry* entry;
NT_Publisher handle;
PubSubOptionsImpl options;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
@@ -106,13 +105,11 @@ class ClientImpl3 final : private MessageHandler3 {
bool CheckNetworkReady(uint64_t curTimeMs);
// Outgoing handlers
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
void Publish(int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle);
void SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update);
void SetValue(NT_Publisher pubHandle, const Value& value);
void Unpublish(int pubuid);
void SetProperties(std::string_view name, const wpi::json& update);
void SetValue(int pubuid, const Value& value);
// MessageHandler interface
void KeepAlive() final;

View File

@@ -105,9 +105,9 @@ TEST_F(LocalStorageTest, GetTopicInfoUnpublished) {
}
TEST_F(LocalStorageTest, DefaultProps) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
storage.Publish(fooTopic, NT_BOOLEAN, "boolean", wpi::json::object(), {});
EXPECT_FALSE(storage.GetTopicPersistent(fooTopic));
@@ -116,9 +116,9 @@ TEST_F(LocalStorageTest, DefaultProps) {
}
TEST_F(LocalStorageTest, PublishNewNoProps) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
storage.Publish(fooTopic, NT_BOOLEAN, "boolean", wpi::json::object(), {});
auto info = storage.GetTopicInfo(fooTopic);
@@ -126,9 +126,9 @@ TEST_F(LocalStorageTest, PublishNewNoProps) {
}
TEST_F(LocalStorageTest, PublishNewNoPropsNull) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {});
auto info = storage.GetTopicInfo(fooTopic);
@@ -137,9 +137,9 @@ TEST_F(LocalStorageTest, PublishNewNoPropsNull) {
TEST_F(LocalStorageTest, PublishNew) {
wpi::json properties = {{"persistent", true}};
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, properties,
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
properties, IsDefaultPubSubOptions()));
storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {{"persistent", true}}, {});
auto info = storage.GetTopicInfo(fooTopic);
@@ -159,13 +159,13 @@ TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPost) {
IsDefaultPubSubOptions()));
auto sub = storage.Subscribe(fooTopic, NT_UNASSIGNED, "", {});
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
auto pub = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {});
auto val = Value::MakeBoolean(true, 5);
EXPECT_CALL(network, SetValue(pub, val));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val));
storage.SetEntryValue(pub, val);
EXPECT_EQ(storage.GetTopicType(fooTopic), NT_BOOLEAN);
@@ -183,7 +183,7 @@ TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPost) {
EXPECT_EQ(vals[0].time, 5);
val = Value::MakeBoolean(false, 6);
EXPECT_CALL(network, SetValue(pub, val));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val));
storage.SetEntryValue(pub, val);
auto vals2 = storage.ReadQueue<int64_t>(sub); // mismatched type
@@ -191,13 +191,13 @@ TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPost) {
}
TEST_F(LocalStorageTest, SubscribeNoTypeLocalPubPre) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
auto pub = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {});
auto val = Value::MakeBoolean(true, 5);
EXPECT_CALL(network, SetValue(pub, val));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val));
storage.SetEntryValue(pub, val);
EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}),
@@ -221,9 +221,9 @@ TEST_F(LocalStorageTest, EntryNoTypeLocalSet) {
// results in a publish and value set
auto val = Value::MakeBoolean(true, 5);
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
EXPECT_CALL(network, SetValue(_, val));
EXPECT_TRUE(storage.SetEntryValue(entry, val));
@@ -265,9 +265,9 @@ TEST_F(LocalStorageTest, PubUnpubPub) {
IsDefaultPubSubOptions()));
auto sub = storage.Subscribe(fooTopic, NT_INTEGER, "int", {});
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
EXPECT_CALL(logger,
Call(NT_LOG_INFO, _, _,
std::string_view{
@@ -276,7 +276,7 @@ TEST_F(LocalStorageTest, PubUnpubPub) {
auto pub = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {});
auto val = Value::MakeBoolean(true, 5);
EXPECT_CALL(network, SetValue(pub, val));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val));
EXPECT_TRUE(storage.SetEntryValue(pub, val));
EXPECT_EQ(storage.GetTopicType(fooTopic), NT_BOOLEAN);
@@ -285,20 +285,20 @@ TEST_F(LocalStorageTest, PubUnpubPub) {
EXPECT_TRUE(storage.ReadQueue<int64_t>(sub).empty());
EXPECT_CALL(network, Unpublish(pub, fooTopic));
EXPECT_CALL(network, Unpublish(Handle{pub}.GetIndex()));
storage.Unpublish(pub);
EXPECT_EQ(storage.GetTopicType(fooTopic), NT_UNASSIGNED);
EXPECT_EQ(storage.GetTopicTypeString(fooTopic), "");
EXPECT_FALSE(storage.GetTopicExists(fooTopic));
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"int"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"int"},
wpi::json::object(), IsDefaultPubSubOptions()));
pub = storage.Publish(fooTopic, NT_INTEGER, "int", {}, {});
val = Value::MakeInteger(3, 5);
EXPECT_CALL(network, SetValue(pub, val));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val));
EXPECT_TRUE(storage.SetEntryValue(pub, val));
EXPECT_EQ(storage.GetTopicType(fooTopic), NT_INTEGER);
@@ -309,9 +309,9 @@ TEST_F(LocalStorageTest, PubUnpubPub) {
}
TEST_F(LocalStorageTest, LocalPubConflict) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
auto pub1 = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {});
EXPECT_CALL(
@@ -325,32 +325,32 @@ TEST_F(LocalStorageTest, LocalPubConflict) {
EXPECT_EQ(storage.GetTopicTypeString(fooTopic), "boolean");
EXPECT_TRUE(storage.GetTopicExists(fooTopic));
EXPECT_CALL(network, SetValue(pub1, _));
EXPECT_CALL(network, SetValue(Handle{pub1}.GetIndex(), _));
EXPECT_TRUE(storage.SetEntryValue(pub1, Value::MakeBoolean(true, 5)));
EXPECT_FALSE(storage.SetEntryValue(pub2, Value::MakeInteger(3, 5)));
// unpublishing pub1 will publish pub2 to the network
EXPECT_CALL(network, Unpublish(pub1, fooTopic));
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"int"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network, Unpublish(Handle{pub1}.GetIndex()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"int"},
wpi::json::object(), IsDefaultPubSubOptions()));
storage.Unpublish(pub1);
EXPECT_EQ(storage.GetTopicType(fooTopic), NT_INTEGER);
EXPECT_EQ(storage.GetTopicTypeString(fooTopic), "int");
EXPECT_TRUE(storage.GetTopicExists(fooTopic));
EXPECT_CALL(network, SetValue(pub2, _));
EXPECT_CALL(network, SetValue(Handle{pub2}.GetIndex(), _));
EXPECT_FALSE(storage.SetEntryValue(pub1, Value::MakeBoolean(true, 5)));
EXPECT_TRUE(storage.SetEntryValue(pub2, Value::MakeInteger(3, 5)));
}
TEST_F(LocalStorageTest, LocalSubConflict) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {});
EXPECT_CALL(network, Subscribe(_, wpi::SpanEq({std::string{"foo"}}),
@@ -364,9 +364,9 @@ TEST_F(LocalStorageTest, LocalSubConflict) {
}
TEST_F(LocalStorageTest, RemotePubConflict) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {});
@@ -376,16 +376,16 @@ TEST_F(LocalStorageTest, RemotePubConflict) {
"network announce of 'foo' overriding local publish "
"(was 'boolean', now 'int')"}));
storage.NetworkAnnounce("foo", "int", wpi::json::object(), {});
storage.NetworkAnnounce("foo", "int", wpi::json::object(), std::nullopt);
// network overrides local
EXPECT_EQ(storage.GetTopicType(fooTopic), NT_INTEGER);
EXPECT_EQ(storage.GetTopicTypeString(fooTopic), "int");
EXPECT_TRUE(storage.GetTopicExists(fooTopic));
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
storage.NetworkUnannounce("foo");
@@ -414,14 +414,14 @@ TEST_F(LocalStorageTest, SetDefaultSubscribe) {
}
TEST_F(LocalStorageTest, SetDefaultPublish) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
auto pub = storage.Publish(fooTopic, NT_BOOLEAN, "boolean", {}, {});
// expect a value across the wire
auto expectVal = Value::MakeBoolean(true, 0);
EXPECT_CALL(network, SetValue(pub, expectVal));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), expectVal));
EXPECT_TRUE(storage.SetDefaultEntryValue(pub, Value::MakeBoolean(true)));
EXPECT_CALL(network, Subscribe(_, _, IsDefaultPubSubOptions()));
@@ -438,9 +438,9 @@ TEST_F(LocalStorageTest, SetDefaultEntry) {
auto entry = storage.GetEntry(fooTopic, NT_BOOLEAN, "boolean", {});
// expect a publish and value
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
auto expectVal = Value::MakeBoolean(true, 0);
EXPECT_CALL(network, SetValue(_, expectVal));
EXPECT_TRUE(storage.SetDefaultEntryValue(entry, Value::MakeBoolean(true)));
@@ -457,9 +457,9 @@ TEST_F(LocalStorageTest, SetDefaultEntryUnassigned) {
auto entry = storage.GetEntry(fooTopic, NT_UNASSIGNED, "", {});
// expect a publish and value
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"boolean"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"boolean"},
wpi::json::object(), IsDefaultPubSubOptions()));
auto expectVal = Value::MakeBoolean(true, 0);
EXPECT_CALL(network, SetValue(_, expectVal));
EXPECT_TRUE(storage.SetDefaultEntryValue(entry, Value::MakeBoolean(true)));
@@ -472,9 +472,9 @@ TEST_F(LocalStorageTest, SetDefaultEntryUnassigned) {
}
TEST_F(LocalStorageTest, SetDefaultEntryDiffType) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"string"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"string"},
wpi::json::object(), IsDefaultPubSubOptions()));
auto pub = storage.Publish(fooTopic, NT_STRING, "string", {}, {});
EXPECT_FALSE(storage.SetDefaultEntryValue(pub, Value::MakeBoolean(true)));
@@ -482,9 +482,9 @@ TEST_F(LocalStorageTest, SetDefaultEntryDiffType) {
}
TEST_F(LocalStorageTest, SetValueEmptyValue) {
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"string"}, wpi::json::object(),
IsDefaultPubSubOptions()));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"string"},
wpi::json::object(), IsDefaultPubSubOptions()));
auto pub = storage.Publish(fooTopic, NT_STRING, "string", {}, {});
EXPECT_FALSE(storage.SetEntryValue(pub, {}));
@@ -525,9 +525,9 @@ class LocalStorageDuplicatesTest : public LocalStorageTest {
void LocalStorageDuplicatesTest::SetupPubSub(bool keepPub, bool keepSub) {
PubSubOptionsImpl pubOptions;
pubOptions.keepDuplicates = keepPub;
EXPECT_CALL(network, Publish(_, fooTopic, std::string_view{"foo"},
std::string_view{"double"}, wpi::json::object(),
IsPubSubOptions(pubOptions)));
EXPECT_CALL(network,
Publish(_, std::string_view{"foo"}, std::string_view{"double"},
wpi::json::object(), IsPubSubOptions(pubOptions)));
pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {},
{.keepDuplicates = keepPub});
@@ -552,8 +552,8 @@ void LocalStorageDuplicatesTest::SetValues(bool expectDuplicates) {
TEST_F(LocalStorageDuplicatesTest, Defaults) {
SetupPubSub(false, false);
EXPECT_CALL(network, SetValue(pub, val1));
EXPECT_CALL(network, SetValue(pub, val3));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val1));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val3));
SetValues(false);
// verify 2nd update was dropped locally
@@ -568,9 +568,9 @@ TEST_F(LocalStorageDuplicatesTest, Defaults) {
TEST_F(LocalStorageDuplicatesTest, KeepPub) {
SetupPubSub(true, false);
EXPECT_CALL(network, SetValue(pub, val1)).Times(2);
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val1)).Times(2);
// EXPECT_CALL(network, SetValue(pub, val2));
EXPECT_CALL(network, SetValue(pub, val3));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val3));
SetValues(true);
// verify only 2 updates were received locally
@@ -582,8 +582,8 @@ TEST_F(LocalStorageDuplicatesTest, KeepSub) {
SetupPubSub(false, true);
// second update should NOT go to the network
EXPECT_CALL(network, SetValue(pub, val1));
EXPECT_CALL(network, SetValue(pub, val3));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val1));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val3));
SetValues(false);
// verify 2 updates were received locally
@@ -595,9 +595,9 @@ TEST_F(LocalStorageDuplicatesTest, KeepPubSub) {
SetupPubSub(true, true);
// second update SHOULD go to the network
EXPECT_CALL(network, SetValue(pub, val1)).Times(2);
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val1)).Times(2);
// EXPECT_CALL(network, SetValue(pub, val2));
EXPECT_CALL(network, SetValue(pub, val3));
EXPECT_CALL(network, SetValue(Handle{pub}.GetIndex(), val3));
SetValues(true);
// verify all 3 updates were received locally
@@ -609,7 +609,7 @@ TEST_F(LocalStorageDuplicatesTest, FromNetworkDefault) {
SetupPubSub(false, false);
// incoming from the network are treated like a normal local publish
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, 0);
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, std::nullopt);
storage.NetworkSetValue(topic, val1);
storage.NetworkSetValue(topic, val2);
// verify the timestamp was updated
@@ -629,7 +629,7 @@ TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepPub) {
SetupPubSub(true, false);
// incoming from the network are treated like a normal local publish
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, 0);
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, std::nullopt);
storage.NetworkSetValue(topic, val1);
storage.NetworkSetValue(topic, val2);
// verify the timestamp was updated
@@ -648,7 +648,7 @@ TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepSub) {
SetupPubSub(false, true);
// incoming from the network are treated like a normal local publish
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, 0);
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, std::nullopt);
storage.NetworkSetValue(topic, val1);
storage.NetworkSetValue(topic, val2);
// verify the timestamp was updated
@@ -670,7 +670,7 @@ TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepPubSub) {
SetupPubSub(true, true);
// incoming from the network are treated like a normal local publish
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, 0);
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, std::nullopt);
storage.NetworkSetValue(topic, val1);
storage.NetworkSetValue(topic, val2);
// verify the timestamp was updated
@@ -746,7 +746,7 @@ void LocalStorageNumberVariantsTest::CreateSubscribersArray() {
TEST_F(LocalStorageNumberVariantsTest, GetEntryPubAfter) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(5);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1);
EXPECT_CALL(network, SetValue(_, _)).Times(1);
CreateSubscribers();
auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {});
@@ -768,7 +768,7 @@ TEST_F(LocalStorageNumberVariantsTest, GetEntryPubAfter) {
TEST_F(LocalStorageNumberVariantsTest, GetEntryPubBefore) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(5);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1);
EXPECT_CALL(network, SetValue(_, _)).Times(1);
auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {});
CreateSubscribers();
@@ -795,7 +795,7 @@ template <typename T>
TEST_F(LocalStorageNumberVariantsTest, GetAtomic) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(5);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1);
EXPECT_CALL(network, SetValue(_, _)).Times(1);
auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {});
CreateSubscribers();
@@ -823,7 +823,7 @@ template <typename T, typename U>
TEST_F(LocalStorageNumberVariantsTest, GetAtomicArray) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(5);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1);
EXPECT_CALL(network, SetValue(_, _)).Times(1);
auto pub = storage.Publish(fooTopic, NT_DOUBLE_ARRAY, "double[]", {}, {});
CreateSubscribersArray();
@@ -847,7 +847,7 @@ TEST_F(LocalStorageNumberVariantsTest, GetAtomicArray) {
TEST_F(LocalStorageNumberVariantsTest, ReadQueue) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(5);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1);
EXPECT_CALL(network, SetValue(_, _)).Times(4);
auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {});
CreateSubscribers();
@@ -894,7 +894,7 @@ TEST_F(LocalStorageNumberVariantsTest, ReadQueue) {
TEST_F(LocalStorageTest, MultiSubSpecial) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(2);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(2);
EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(2);
EXPECT_CALL(network, SetValue(_, _)).Times(2);
EXPECT_CALL(listenerStorage, Activate(_, _, _)).Times(2);
@@ -921,10 +921,10 @@ TEST_F(LocalStorageTest, MultiSubSpecial) {
}
TEST_F(LocalStorageTest, NetworkDuplicateDetect) {
EXPECT_CALL(network, Publish(_, _, _, _, _, _));
EXPECT_CALL(network, Publish(_, _, _, _, _));
auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {});
auto remoteTopic =
storage.NetworkAnnounce("foo", "double", wpi::json::object(), 0);
auto remoteTopic = storage.NetworkAnnounce("foo", "double",
wpi::json::object(), std::nullopt);
// local set
EXPECT_CALL(network, SetValue(_, _));
@@ -940,7 +940,7 @@ TEST_F(LocalStorageTest, NetworkDuplicateDetect) {
TEST_F(LocalStorageTest, ReadQueueLocalRemote) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(3);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1);
auto subBoth =
storage.Subscribe(fooTopic, NT_DOUBLE, "double", kDefaultPubSubOptions);
@@ -949,8 +949,8 @@ TEST_F(LocalStorageTest, ReadQueueLocalRemote) {
auto subRemote =
storage.Subscribe(fooTopic, NT_DOUBLE, "double", {.disableLocal = true});
auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {});
auto remoteTopic =
storage.NetworkAnnounce("foo", "double", wpi::json::object(), 0);
auto remoteTopic = storage.NetworkAnnounce("foo", "double",
wpi::json::object(), std::nullopt);
// local set
EXPECT_CALL(network, SetValue(_, _));
@@ -972,14 +972,14 @@ TEST_F(LocalStorageTest, ReadQueueLocalRemote) {
TEST_F(LocalStorageTest, SubExcludePub) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(2);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(network, Publish(_, _, _, _, _)).Times(1);
auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {});
auto subActive = storage.Subscribe(fooTopic, NT_DOUBLE, "double", {});
auto subExclude = storage.Subscribe(fooTopic, NT_DOUBLE, "double",
{.excludePublisher = pub});
auto remoteTopic =
storage.NetworkAnnounce("foo", "double", wpi::json::object(), 0);
auto remoteTopic = storage.NetworkAnnounce("foo", "double",
wpi::json::object(), std::nullopt);
// local set
EXPECT_CALL(network, SetValue(_, _));
@@ -1001,11 +1001,11 @@ TEST_F(LocalStorageTest, EntryExcludeSelf) {
auto entry =
storage.GetEntry(fooTopic, NT_DOUBLE, "double", {.excludeSelf = true});
auto remoteTopic =
storage.NetworkAnnounce("foo", "double", wpi::json::object(), 0);
auto remoteTopic = storage.NetworkAnnounce("foo", "double",
wpi::json::object(), std::nullopt);
// local set
EXPECT_CALL(network, Publish(_, _, _, _, _, _));
EXPECT_CALL(network, Publish(_, _, _, _, _));
EXPECT_CALL(network, SetValue(_, _));
storage.SetEntryValue(entry, Value::MakeDouble(1.0, 50));
EXPECT_THAT(storage.ReadQueue<double>(entry), IsEmpty());
@@ -1017,7 +1017,7 @@ TEST_F(LocalStorageTest, EntryExcludeSelf) {
}
TEST_F(LocalStorageTest, ReadQueueInitialLocal) {
EXPECT_CALL(network, Publish(_, _, _, _, _, _));
EXPECT_CALL(network, Publish(_, _, _, _, _));
EXPECT_CALL(network, SetValue(_, _));
EXPECT_CALL(network, Subscribe(_, _, _)).Times(3);
@@ -1041,8 +1041,8 @@ TEST_F(LocalStorageTest, ReadQueueInitialLocal) {
TEST_F(LocalStorageTest, ReadQueueInitialRemote) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(3);
auto remoteTopic =
storage.NetworkAnnounce("foo", "double", wpi::json::object(), 0);
auto remoteTopic = storage.NetworkAnnounce("foo", "double",
wpi::json::object(), std::nullopt);
storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 60));
auto subBoth =

View File

@@ -18,7 +18,7 @@ class MockLocalInterface : public LocalInterface {
public:
MOCK_METHOD(NT_Topic, NetworkAnnounce,
(std::string_view name, std::string_view typeStr,
const wpi::json& properties, NT_Publisher pubHandle),
const wpi::json& properties, std::optional<int> pubuid),
(override));
MOCK_METHOD(void, NetworkUnannounce, (std::string_view name), (override));
MOCK_METHOD(void, NetworkPropertiesUpdate,
@@ -31,30 +31,25 @@ class MockLocalInterface : public LocalInterface {
class MockNetworkInterface : public NetworkInterface {
public:
MOCK_METHOD(void, Publish,
(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
(int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options),
(override));
MOCK_METHOD(void, Unpublish, (NT_Publisher pubHandle, NT_Topic topicHandle),
(override));
MOCK_METHOD(void, Unpublish, (int pubuid), (override));
MOCK_METHOD(void, SetProperties,
(NT_Topic topicHandle, std::string_view name,
const wpi::json& update),
(override));
(std::string_view name, const wpi::json& update), (override));
MOCK_METHOD(void, Subscribe,
(NT_Subscriber subHandle, std::span<const std::string> prefixes,
(int subuid, std::span<const std::string> prefixes,
const PubSubOptionsImpl& options),
(override));
MOCK_METHOD(void, Unsubscribe, (NT_Subscriber subHandle), (override));
MOCK_METHOD(void, SetValue, (NT_Publisher pubHandle, const Value& value),
(override));
MOCK_METHOD(void, Unsubscribe, (int subuid), (override));
MOCK_METHOD(void, SetValue, (int pubuid, const Value& value), (override));
};
class MockLocalStorage : public ILocalStorage {
public:
MOCK_METHOD(NT_Topic, NetworkAnnounce,
(std::string_view name, std::string_view typeStr,
const wpi::json& properties, NT_Publisher pubHandle),
const wpi::json& properties, std::optional<int> pubuid),
(override));
MOCK_METHOD(void, NetworkUnannounce, (std::string_view name), (override));
MOCK_METHOD(void, NetworkPropertiesUpdate,

View File

@@ -135,8 +135,7 @@ static std::vector<uint8_t> EncodeServerBinary(const T& msgs) {
} else if constexpr (std::same_as<typename T::value_type,
net::ClientMessage>) {
if (auto m = std::get_if<net::ClientValueMsg>(&msg.contents)) {
net::WireEncodeBinary(os, Handle{m->pubHandle}.GetIndex(),
m->value.time(), m->value);
net::WireEncodeBinary(os, m->pubuid, m->value.time(), m->value);
}
}
}
@@ -146,29 +145,28 @@ static std::vector<uint8_t> EncodeServerBinary(const T& msgs) {
TEST_F(ServerImplTest, PublishLocal) {
// publish before client connect
server.SetLocal(&local);
NT_Publisher pubHandle = nt::Handle{0, 1, nt::Handle::kPublisher};
NT_Topic topicHandle = nt::Handle{0, 1, nt::Handle::kTopic};
NT_Publisher pubHandle2 = nt::Handle{0, 2, nt::Handle::kPublisher};
NT_Topic topicHandle2 = nt::Handle{0, 2, nt::Handle::kTopic};
NT_Publisher pubHandle3 = nt::Handle{0, 3, nt::Handle::kPublisher};
NT_Topic topicHandle3 = nt::Handle{0, 3, nt::Handle::kTopic};
constexpr int pubuid = 1;
constexpr int pubuid2 = 2;
constexpr int pubuid3 = 3;
{
::testing::InSequence seq;
EXPECT_CALL(local, NetworkAnnounce(std::string_view{"test"},
std::string_view{"double"},
wpi::json::object(), pubHandle));
EXPECT_CALL(local, NetworkAnnounce(std::string_view{"test2"},
std::string_view{"double"},
wpi::json::object(), pubHandle2));
EXPECT_CALL(local, NetworkAnnounce(std::string_view{"test3"},
std::string_view{"double"},
wpi::json::object(), pubHandle3));
EXPECT_CALL(local, NetworkAnnounce(
std::string_view{"test"}, std::string_view{"double"},
wpi::json::object(), std::optional<int>{pubuid}));
EXPECT_CALL(
local,
NetworkAnnounce(std::string_view{"test2"}, std::string_view{"double"},
wpi::json::object(), std::optional<int>{pubuid2}));
EXPECT_CALL(
local,
NetworkAnnounce(std::string_view{"test3"}, std::string_view{"double"},
wpi::json::object(), std::optional<int>{pubuid3}));
}
{
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::PublishMsg{
pubHandle, topicHandle, "test", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::PublishMsg{pubuid, "test", "double", wpi::json::object(), {}}});
server.HandleLocal(msgs);
}
@@ -205,18 +203,18 @@ TEST_F(ServerImplTest, PublishLocal) {
setPeriodic.AsStdFunction());
{
NT_Subscriber subHandle = nt::Handle{0, 1, nt::Handle::kSubscriber};
constexpr int subuid = 1;
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::SubscribeMsg{
subHandle, {{""}}, PubSubOptions{.prefixMatch = true}}});
msgs.emplace_back(net::ClientMessage{
net::SubscribeMsg{subuid, {{""}}, PubSubOptions{.prefixMatch = true}}});
server.ProcessIncomingText(id, EncodeText(msgs));
}
// publish before send control
{
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::PublishMsg{
pubHandle2, topicHandle2, "test2", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::PublishMsg{pubuid2, "test2", "double", wpi::json::object(), {}}});
server.HandleLocal(msgs);
}
@@ -225,8 +223,8 @@ TEST_F(ServerImplTest, PublishLocal) {
// publish after send control
{
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::PublishMsg{
pubHandle3, topicHandle3, "test3", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::PublishMsg{pubuid3, "test3", "double", wpi::json::object(), {}}});
server.HandleLocal(msgs);
}
@@ -236,18 +234,17 @@ TEST_F(ServerImplTest, PublishLocal) {
TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) {
// publish before client connect
server.SetLocal(&local);
NT_Publisher pubHandle = nt::Handle{0, 1, nt::Handle::kPublisher};
NT_Topic topicHandle = nt::Handle{0, 1, nt::Handle::kTopic};
EXPECT_CALL(local, NetworkAnnounce(std::string_view{"test"},
std::string_view{"double"},
wpi::json::object(), pubHandle));
constexpr int pubuid = 1;
EXPECT_CALL(local, NetworkAnnounce(
std::string_view{"test"}, std::string_view{"double"},
wpi::json::object(), std::optional<int>{pubuid}));
{
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::PublishMsg{
pubHandle, topicHandle, "test", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::ClientValueMsg{pubHandle, Value::MakeDouble(1.0, 10)}});
net::PublishMsg{pubuid, "test", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::ClientValueMsg{pubuid, Value::MakeDouble(1.0, 10)}});
server.HandleLocal(msgs);
}
@@ -283,10 +280,10 @@ TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) {
// subscribe topics only; will not send value
{
NT_Subscriber subHandle = nt::Handle{0, 1, nt::Handle::kSubscriber};
constexpr int subuid = 1;
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::SubscribeMsg{
subHandle,
subuid,
{{""}},
PubSubOptions{.topicsOnly = true, .prefixMatch = true}}});
server.ProcessIncomingText(id, EncodeText(msgs));
@@ -296,10 +293,10 @@ TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) {
// subscribe normal; will not resend announcement, but will send value
{
NT_Subscriber subHandle = nt::Handle{0, 2, nt::Handle::kSubscriber};
constexpr int subuid = 2;
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{
net::SubscribeMsg{subHandle, {{"test"}}, PubSubOptions{}}});
net::SubscribeMsg{subuid, {{"test"}}, PubSubOptions{}}});
server.ProcessIncomingText(id, EncodeText(msgs));
}
@@ -308,37 +305,33 @@ TEST_F(ServerImplTest, ClientSubTopicOnlyThenValue) {
TEST_F(ServerImplTest, ClientDisconnectUnpublish) {
server.SetLocal(&local);
NT_Publisher pubLocalHandle = nt::Handle{0, 1, nt::Handle::kPublisher};
NT_Topic topicLocalHandle = nt::Handle{0, 1, nt::Handle::kTopic};
NT_Publisher subHandle = nt::Handle{0, 1, nt::Handle::kSubscriber};
constexpr int pubuidLocal = 1;
constexpr int subuid = 1;
{
::testing::InSequence seq;
EXPECT_CALL(local, NetworkAnnounce(std::string_view{"test2"},
std::string_view{"double"},
wpi::json::object(), pubLocalHandle));
EXPECT_CALL(local, NetworkAnnounce(std::string_view{"test"},
std::string_view{"double"},
wpi::json::object(), 0));
EXPECT_CALL(
local,
NetworkAnnounce(std::string_view{"test2"}, std::string_view{"double"},
wpi::json::object(), std::optional<int>{pubuidLocal}));
EXPECT_CALL(local, NetworkAnnounce(
std::string_view{"test"}, std::string_view{"double"},
wpi::json::object(), std::optional<int>{}));
EXPECT_CALL(local, NetworkUnannounce(std::string_view{"test"}));
}
{
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::PublishMsg{pubLocalHandle,
topicLocalHandle,
"test2",
"double",
wpi::json::object(),
{}}});
msgs.emplace_back(net::ClientMessage{net::PublishMsg{
pubuidLocal, "test2", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::ClientValueMsg{pubLocalHandle, Value::MakeDouble(1.0, 10)}});
net::ClientValueMsg{pubuidLocal, Value::MakeDouble(1.0, 10)}});
server.HandleLocal(msgs);
}
{
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(
net::ClientMessage{net::SubscribeMsg{subHandle, {"test"}, {}}});
net::ClientMessage{net::SubscribeMsg{subuid, {"test"}, {}}});
server.HandleLocal(msgs);
}
@@ -363,11 +356,10 @@ TEST_F(ServerImplTest, ClientDisconnectUnpublish) {
// publish topic
{
NT_Publisher pubHandle = nt::Handle{0, 1, nt::Handle::kPublisher};
NT_Topic topicHandle = nt::Handle{0, 1, nt::Handle::kTopic};
constexpr int pubuid = 1;
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::PublishMsg{
pubHandle, topicHandle, "test", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::PublishMsg{pubuid, "test", "double", wpi::json::object(), {}}});
server.ProcessIncomingText(id, EncodeText(msgs));
}
@@ -380,18 +372,18 @@ TEST_F(ServerImplTest, ClientDisconnectUnpublish) {
TEST_F(ServerImplTest, ZeroTimestampNegativeTime) {
// publish before client connect
server.SetLocal(&local);
NT_Publisher pubHandle = nt::Handle{0, 1, nt::Handle::kPublisher};
constexpr int pubuid = 1;
NT_Topic topicHandle = nt::Handle{0, 1, nt::Handle::kTopic};
NT_Subscriber subHandle = nt::Handle{0, 1, nt::Handle::kSubscriber};
constexpr int subuid = 1;
Value defaultValue = Value::MakeDouble(1.0, 10);
defaultValue.SetTime(0);
defaultValue.SetServerTime(0);
Value value = Value::MakeDouble(5, -10);
{
::testing::InSequence seq;
EXPECT_CALL(local, NetworkAnnounce(std::string_view{"test"},
std::string_view{"double"},
wpi::json::object(), pubHandle))
EXPECT_CALL(local, NetworkAnnounce(
std::string_view{"test"}, std::string_view{"double"},
wpi::json::object(), std::optional<int>{pubuid}))
.WillOnce(Return(topicHandle));
EXPECT_CALL(local, NetworkSetValue(topicHandle, defaultValue));
EXPECT_CALL(local, NetworkSetValue(topicHandle, value));
@@ -399,12 +391,12 @@ TEST_F(ServerImplTest, ZeroTimestampNegativeTime) {
{
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::PublishMsg{
pubHandle, topicHandle, "test", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::PublishMsg{pubuid, "test", "double", wpi::json::object(), {}}});
msgs.emplace_back(
net::ClientMessage{net::ClientValueMsg{pubHandle, defaultValue}});
net::ClientMessage{net::ClientValueMsg{pubuid, defaultValue}});
msgs.emplace_back(
net::ClientMessage{net::SubscribeMsg{subHandle, {"test"}, {}}});
net::ClientMessage{net::SubscribeMsg{subuid, {"test"}, {}}});
server.HandleLocal(msgs);
}
@@ -421,14 +413,13 @@ TEST_F(ServerImplTest, ZeroTimestampNegativeTime) {
// publish and send non-default value with negative time offset
{
NT_Subscriber pubHandle2 = nt::Handle{0, 2, nt::Handle::kPublisher};
constexpr int pubuid2 = 2;
std::vector<net::ClientMessage> msgs;
msgs.emplace_back(net::ClientMessage{net::PublishMsg{
pubHandle2, topicHandle, "test", "double", wpi::json::object(), {}}});
msgs.emplace_back(net::ClientMessage{
net::PublishMsg{pubuid2, "test", "double", wpi::json::object(), {}}});
server.ProcessIncomingText(id, EncodeText(msgs));
msgs.clear();
msgs.emplace_back(
net::ClientMessage{net::ClientValueMsg{pubHandle2, value}});
msgs.emplace_back(net::ClientMessage{net::ClientValueMsg{pubuid2, value}});
server.ProcessIncomingBinary(id, EncodeServerBinary(msgs));
}
}

View File

@@ -10,7 +10,6 @@
#include "../MockLogger.h"
#include "../TestPrinters.h"
#include "Handle.h"
#include "gmock/gmock.h"
#include "net/Message.h"
#include "net/WireDecoder.h"
@@ -26,24 +25,23 @@ namespace nt {
class MockClientMessageHandler : public net::ClientMessageHandler {
public:
MOCK_METHOD4(ClientPublish,
void(int64_t pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties));
MOCK_METHOD1(ClientUnpublish, void(int64_t pubuid));
void(int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties));
MOCK_METHOD1(ClientUnpublish, void(int pubuid));
MOCK_METHOD2(ClientSetProperties,
void(std::string_view name, const wpi::json& update));
MOCK_METHOD3(ClientSubscribe,
void(int64_t subuid, std::span<const std::string> prefixes,
void(int subuid, std::span<const std::string> prefixes,
const PubSubOptionsImpl& options));
MOCK_METHOD1(ClientUnsubscribe, void(int64_t subuid));
MOCK_METHOD1(ClientUnsubscribe, void(int subuid));
};
class MockServerMessageHandler : public net::ServerMessageHandler {
public:
MOCK_METHOD5(ServerAnnounce,
void(std::string_view name, int64_t id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubuid));
MOCK_METHOD2(ServerUnannounce, void(std::string_view name, int64_t id));
void(std::string_view name, int id, std::string_view typeStr,
const wpi::json& properties, std::optional<int> pubuid));
MOCK_METHOD2(ServerUnannounce, void(std::string_view name, int id));
MOCK_METHOD3(ServerPropertiesUpdate,
void(std::string_view name, const wpi::json& update, bool ack));
};

View File

@@ -142,8 +142,7 @@ TEST_F(WireEncoderTextTest, Unannounce) {
}
TEST_F(WireEncoderTextTest, MessagePublish) {
net::ClientMessage msg{net::PublishMsg{
Handle{0, 5, Handle::kPublisher}, 0, "test", "double", {{"k", 6}}, {}}};
net::ClientMessage msg{net::PublishMsg{5, "test", "double", {{"k", 6}}, {}}};
ASSERT_TRUE(net::WireEncodeText(os, msg));
ASSERT_EQ(os.str(),
"{\"method\":\"publish\",\"params\":{"
@@ -152,14 +151,13 @@ TEST_F(WireEncoderTextTest, MessagePublish) {
}
TEST_F(WireEncoderTextTest, MessageUnpublish) {
net::ClientMessage msg{
net::UnpublishMsg{Handle{0, 5, Handle::kPublisher}, 0}};
net::ClientMessage msg{net::UnpublishMsg{5}};
ASSERT_TRUE(net::WireEncodeText(os, msg));
ASSERT_EQ(os.str(), "{\"method\":\"unpublish\",\"params\":{\"pubuid\":5}}");
}
TEST_F(WireEncoderTextTest, MessageSetProperties) {
net::ClientMessage msg{net::SetPropertiesMsg{0, "test", {{"k", 6}}}};
net::ClientMessage msg{net::SetPropertiesMsg{"test", {{"k", 6}}}};
ASSERT_TRUE(net::WireEncodeText(os, msg));
ASSERT_EQ(os.str(),
"{\"method\":\"setproperties\",\"params\":{"
@@ -167,20 +165,17 @@ TEST_F(WireEncoderTextTest, MessageSetProperties) {
}
TEST_F(WireEncoderTextTest, MessageSubscribe) {
net::ClientMessage msg{
net::SubscribeMsg{Handle{0, 5, Handle::kSubscriber}, {"a", "b"}, {}}};
net::ClientMessage msg{net::SubscribeMsg{5, {"a", "b"}, {}}};
ASSERT_TRUE(net::WireEncodeText(os, msg));
ASSERT_EQ(os.str(),
"{\"method\":\"subscribe\",\"params\":{"
"\"options\":{},\"topics\":[\"a\",\"b\"],\"subuid\":402653189}}");
"\"options\":{},\"topics\":[\"a\",\"b\"],\"subuid\":5}}");
}
TEST_F(WireEncoderTextTest, MessageUnsubscribe) {
net::ClientMessage msg{
net::UnsubscribeMsg{Handle{0, 5, Handle::kSubscriber}}};
net::ClientMessage msg{net::UnsubscribeMsg{5}};
ASSERT_TRUE(net::WireEncodeText(os, msg));
ASSERT_EQ(os.str(),
"{\"method\":\"unsubscribe\",\"params\":{\"subuid\":402653189}}");
ASSERT_EQ(os.str(), "{\"method\":\"unsubscribe\",\"params\":{\"subuid\":5}}");
}
TEST_F(WireEncoderTextTest, MessageAnnounce) {