[ntcore] NetworkTables 4 (#3217)

This commit is contained in:
Peter Johnson
2022-10-08 10:01:31 -07:00
committed by GitHub
parent 90cfa00115
commit 77301b126c
380 changed files with 34573 additions and 22095 deletions

View File

@@ -0,0 +1,487 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "ClientImpl.h"
#include <numeric>
#include <optional>
#include <string>
#include <variant>
#include <fmt/format.h>
#include <wpi/DenseMap.h>
#include <wpi/Logger.h>
#include <wpi/raw_ostream.h>
#include <wpi/timestamp.h>
#include "Handle.h"
#include "Log.h"
#include "Message.h"
#include "NetworkInterface.h"
#include "PubSubOptions.h"
#include "WireConnection.h"
#include "WireDecoder.h"
#include "WireEncoder.h"
#include "networktables/NetworkTableValue.h"
using namespace nt;
using namespace nt::net;
static constexpr uint32_t kMinPeriodMs = 5;
// maximum number of times the wire can be not ready to send another
// transmission before we close the connection
static constexpr int kWireMaxNotReady = 10;
namespace {
struct PublisherData {
NT_Publisher handle;
PubSubOptions options;
// in options as double, but copy here as integer; rounded to the nearest
// 10 ms
uint32_t periodMs;
uint64_t nextSendMs{0};
Value lastValue; // only used for duplicate value checking
std::vector<Value> outValues; // outgoing values
};
class CImpl : public ServerMessageHandler {
public:
CImpl(uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic);
void ProcessIncomingBinary(wpi::span<const uint8_t> data);
void HandleLocal(std::vector<ClientMessage>&& msgs);
void SendOutgoing(wpi::span<const ClientMessage> msgs);
bool SendControl(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs);
bool CheckNetworkReady();
// ServerMessageHandler interface
void ServerAnnounce(std::string_view name, int64_t id,
std::string_view typeStr, const wpi::json& properties,
std::optional<int64_t> pubuid) final;
void ServerUnannounce(std::string_view name, int64_t id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptions& options);
bool Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle);
void SetValue(NT_Publisher pubHandle, const Value& value);
int m_inst;
WireConnection& m_wire;
wpi::Logger& m_logger;
LocalInterface* m_local{nullptr};
std::function<void(uint32_t repeatMs)> m_setPeriodic;
// indexed by publisher index
std::vector<std::unique_ptr<PublisherData>> m_publishers;
// indexed by server-provided topic id
wpi::DenseMap<int64_t, NT_Topic> m_topicMap;
// timestamp handling
static constexpr uint32_t kPingIntervalMs = 3000;
uint64_t m_nextPingTimeMs{0};
uint32_t m_rtt2Us{UINT32_MAX};
bool m_haveTimeOffset{false};
int64_t m_serverTimeOffsetUs{0};
// periodic sweep handling
uint32_t m_periodMs{kPingIntervalMs + 10};
uint64_t m_lastSendMs{0};
int m_notReadyCount{0};
// outgoing queue
std::vector<ClientMessage> m_outgoing;
};
} // namespace
CImpl::CImpl(uint64_t curTimeMs, int inst, WireConnection& wire,
wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic)
: m_inst{inst},
m_wire{wire},
m_logger{logger},
m_setPeriodic{std::move(setPeriodic)},
m_nextPingTimeMs{curTimeMs + kPingIntervalMs} {
// immediately send RTT ping
auto out = m_wire.SendBinary();
auto now = wpi::Now();
DEBUG4("Sending initial RTT ping {}", now);
WireEncodeBinary(out.Add(), -1, 0, Value::MakeInteger(now));
m_wire.Flush();
m_setPeriodic(m_periodMs);
}
void CImpl::ProcessIncomingBinary(wpi::span<const uint8_t> data) {
for (;;) {
if (data.empty()) {
break;
}
// decode message
int64_t id;
Value value;
std::string error;
if (!WireDecodeBinary(&data, &id, &value, &error, -m_serverTimeOffsetUs)) {
ERROR("binary decode error: {}", error);
break; // FIXME
}
DEBUG4("BinaryMessage({})", id);
// handle RTT ping response
if (id == -1) {
if (!value.IsInteger()) {
WARNING("RTT ping response with non-integer type {}",
static_cast<int>(value.type()));
continue;
}
DEBUG4("RTT ping response time {} value {}", value.time(),
value.GetInteger());
int64_t now = wpi::Now();
int64_t rtt2 = (now - value.GetInteger()) / 2;
if (rtt2 < m_rtt2Us) {
m_rtt2Us = rtt2;
m_serverTimeOffsetUs = value.server_time() + rtt2 - now;
DEBUG3("Time offset: {}", m_serverTimeOffsetUs);
m_haveTimeOffset = true;
}
continue;
}
// otherwise it's a value message, get the local topic handle for it
auto topicIt = m_topicMap.find(id);
if (topicIt == m_topicMap.end()) {
WARNING("received unknown id {}", id);
continue;
}
// pass along to local handler
if (m_local) {
m_local->NetworkSetValue(topicIt->second, value);
}
}
}
void CImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
DEBUG4("{}", "HandleLocal()");
for (auto&& elem : msgs) {
// common case is value
if (auto msg = std::get_if<ClientValueMsg>(&elem.contents)) {
SetValue(msg->pubHandle, msg->value);
// setvalue puts on individual publish outgoing queues
} else if (auto msg = std::get_if<PublishMsg>(&elem.contents)) {
Publish(msg->pubHandle, msg->topicHandle, msg->name, msg->typeStr,
msg->properties, msg->options);
m_outgoing.emplace_back(std::move(elem));
} else if (auto msg = std::get_if<UnpublishMsg>(&elem.contents)) {
if (Unpublish(msg->pubHandle, msg->topicHandle)) {
m_outgoing.emplace_back(std::move(elem));
}
} else {
m_outgoing.emplace_back(std::move(elem));
}
}
}
bool CImpl::SendControl(uint64_t curTimeMs) {
DEBUG4("SendControl({})", curTimeMs);
// rate limit sends
if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) {
return false;
}
// start a timestamp RTT ping if it's time to do one
if (curTimeMs >= m_nextPingTimeMs) {
if (!CheckNetworkReady()) {
return false;
}
auto now = wpi::Now();
DEBUG4("Sending RTT ping {}", now);
WireEncodeBinary(m_wire.SendBinary().Add(), -1, 0, Value::MakeInteger(now));
// drift isn't critical here, so just go from current time
m_nextPingTimeMs = curTimeMs + kPingIntervalMs;
}
if (!m_outgoing.empty()) {
if (!CheckNetworkReady()) {
return false;
}
auto writer = m_wire.SendText();
for (auto&& msg : m_outgoing) {
auto& stream = writer.Add();
if (!WireEncodeText(stream, msg)) {
// shouldn't happen, but just in case...
stream << "{}";
}
}
m_outgoing.resize(0);
}
m_lastSendMs = curTimeMs;
return true;
}
void CImpl::SendValues(uint64_t curTimeMs) {
DEBUG4("SendPeriodic({})", curTimeMs);
// can't send value updates until we have a RTT
if (!m_haveTimeOffset) {
return;
}
// ensure all control messages are sent ahead of value updates
if (!SendControl(curTimeMs)) {
return;
}
// send any pending updates due to be sent
bool checkedNetwork = false;
auto writer = m_wire.SendBinary();
for (auto&& pub : m_publishers) {
if (pub && !pub->outValues.empty() && curTimeMs >= pub->nextSendMs) {
for (auto&& val : pub->outValues) {
if (!checkedNetwork) {
if (!CheckNetworkReady()) {
return;
}
checkedNetwork = true;
}
DEBUG4("Sending {} value time={} server_time={} st_off={}", pub->handle,
val.time(), val.server_time(), m_serverTimeOffsetUs);
int64_t time = val.time();
if (time != 0) {
time += m_serverTimeOffsetUs;
}
WireEncodeBinary(writer.Add(), Handle{pub->handle}.GetIndex(), time,
val);
}
pub->outValues.resize(0);
pub->nextSendMs = curTimeMs + pub->periodMs;
}
}
}
bool CImpl::CheckNetworkReady() {
if (!m_wire.Ready()) {
++m_notReadyCount;
if (m_notReadyCount > kWireMaxNotReady) {
m_wire.Disconnect("transmit stalled");
}
return false;
}
m_notReadyCount = 0;
return true;
}
void CImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptions& options) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
m_publishers.resize(index + 1);
}
auto& publisher = m_publishers[index];
if (!publisher) {
publisher = std::make_unique<PublisherData>();
}
publisher->handle = pubHandle;
publisher->options = options;
publisher->periodMs = std::lround(options.periodic * 100) * 10;
if (publisher->periodMs < kMinPeriodMs) {
publisher->periodMs = kMinPeriodMs;
}
// update period
m_periodMs = std::gcd(m_periodMs, publisher->periodMs);
if (m_periodMs < kMinPeriodMs) {
m_periodMs = kMinPeriodMs;
}
m_setPeriodic(m_periodMs);
}
bool CImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
return false;
}
bool doSend = true;
if (m_publishers[index]) {
// Look through outgoing queue to see if the publish hasn't been sent yet;
// if it hasn't, delete it and don't send the server a message.
// The outgoing queue doesn't contain values; those are deleted with the
// publisher object.
auto it = std::find_if(
m_outgoing.begin(), m_outgoing.end(), [&](const auto& elem) {
if (auto msg = std::get_if<PublishMsg>(&elem.contents)) {
return msg->pubHandle == pubHandle;
}
return false;
});
if (it != m_outgoing.end()) {
m_outgoing.erase(it);
doSend = false;
}
}
m_publishers[index].reset();
// loop over all publishers to update period
m_periodMs = kPingIntervalMs + 10;
for (auto&& pub : m_publishers) {
if (pub) {
m_periodMs = std::gcd(m_periodMs, pub->periodMs);
}
}
if (m_periodMs < kMinPeriodMs) {
m_periodMs = kMinPeriodMs;
}
m_setPeriodic(m_periodMs);
return doSend;
}
void CImpl::SetValue(NT_Publisher pubHandle, const Value& value) {
DEBUG4("SetValue({}, time={}, server_time={}, st_off={})", pubHandle,
value.time(), value.server_time(), m_serverTimeOffsetUs);
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size() || !m_publishers[index]) {
return;
}
auto& publisher = *m_publishers[index];
if (!publisher.options.keepDuplicates) {
if (value == publisher.lastValue) {
return;
}
publisher.lastValue = value;
}
if (publisher.outValues.empty() || publisher.options.sendAll) {
publisher.outValues.emplace_back(value);
} else {
publisher.outValues.back() = value;
}
}
void CImpl::ServerAnnounce(std::string_view name, int64_t id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubuid) {
DEBUG4("ServerAnnounce({}, {}, {})", name, id, typeStr);
assert(m_local);
NT_Publisher pubHandle{0};
if (pubuid) {
pubHandle = Handle(m_inst, pubuid.value(), Handle::kPublisher);
}
m_topicMap[id] =
m_local->NetworkAnnounce(name, typeStr, properties, pubHandle);
}
void CImpl::ServerUnannounce(std::string_view name, int64_t id) {
DEBUG4("ServerUnannounce({}, {})", name, id);
assert(m_local);
m_local->NetworkUnannounce(name);
m_topicMap.erase(id);
}
void CImpl::ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
DEBUG4("ServerProperties({}, {}, {})", name, update.dump(), ack);
assert(m_local);
m_local->NetworkPropertiesUpdate(name, update, ack);
}
class ClientImpl::Impl final : public CImpl {
public:
Impl(uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic)
: CImpl{curTimeMs, inst, wire, logger, std::move(setPeriodic)} {}
};
ClientImpl::ClientImpl(uint64_t curTimeMs, int inst, WireConnection& wire,
wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic)
: m_impl{std::make_unique<Impl>(curTimeMs, inst, wire, logger,
std::move(setPeriodic))} {}
ClientImpl::~ClientImpl() = default;
void ClientImpl::ProcessIncomingText(std::string_view data) {
if (!m_impl->m_local) {
return;
}
WireDecodeText(data, *m_impl, m_impl->m_logger);
}
void ClientImpl::ProcessIncomingBinary(wpi::span<const uint8_t> data) {
m_impl->ProcessIncomingBinary(data);
}
void ClientImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
m_impl->HandleLocal(std::move(msgs));
}
void ClientImpl::SendControl(uint64_t curTimeMs) {
m_impl->SendControl(curTimeMs);
m_impl->m_wire.Flush();
}
void ClientImpl::SendValues(uint64_t curTimeMs) {
m_impl->SendValues(curTimeMs);
m_impl->m_wire.Flush();
}
void ClientImpl::SetLocal(LocalInterface* local) {
m_impl->m_local = local;
}
ClientStartup::ClientStartup(ClientImpl& client)
: m_client{client},
m_binaryWriter{client.m_impl->m_wire.SendBinary()},
m_textWriter{client.m_impl->m_wire.SendText()} {}
ClientStartup::~ClientStartup() {
m_client.m_impl->m_wire.Flush();
}
void ClientStartup::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptions& options) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupPublish({}, {}, {}, {})",
pubHandle, topicHandle, name, typeStr);
m_client.m_impl->Publish(pubHandle, topicHandle, name, typeStr, properties,
options);
WireEncodePublish(m_textWriter.Add(), Handle{pubHandle}.GetIndex(), name,
typeStr, properties);
}
void ClientStartup::Subscribe(NT_Subscriber subHandle,
wpi::span<const std::string> prefixes,
const PubSubOptions& options) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSubscribe({})", subHandle);
WireEncodeSubscribe(m_textWriter.Add(), Handle{subHandle}.GetIndex(),
prefixes, options);
}
void ClientStartup::SetValue(NT_Publisher pubHandle, const Value& value) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSetValue({})", pubHandle);
// Similar to Client::SetValue(), except always set lastValue and send
unsigned int index = Handle{pubHandle}.GetIndex();
assert(index < m_client.m_impl->m_publishers.size() &&
m_client.m_impl->m_publishers[index]);
auto& publisher = *m_client.m_impl->m_publishers[index];
publisher.lastValue = value;
// only send time 0 values until we have a RTT
if (value.server_time() == 0) {
WireEncodeBinary(m_binaryWriter.Add(), index, 0, value);
} else {
publisher.outValues.emplace_back(value);
}
}

View File

@@ -0,0 +1,79 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <functional>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
#include <wpi/span.h>
#include "NetworkInterface.h"
#include "WireConnection.h"
namespace wpi {
class Logger;
} // namespace wpi
namespace nt {
class PubSubOptions;
class Value;
} // namespace nt
namespace nt::net {
struct ClientMessage;
class ClientStartup;
class WireConnection;
class ClientImpl {
friend class ClientStartup;
public:
ClientImpl(uint64_t curTimeMs, int inst, WireConnection& wire,
wpi::Logger& logger,
std::function<void(uint32_t repeatMs)> setPeriodic);
~ClientImpl();
void ProcessIncomingText(std::string_view data);
void ProcessIncomingBinary(wpi::span<const uint8_t> data);
void HandleLocal(std::vector<ClientMessage>&& msgs);
void SendControl(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs);
void SetLocal(LocalInterface* local);
private:
class Impl;
std::unique_ptr<Impl> m_impl;
};
class ClientStartup final : public NetworkStartupInterface {
public:
explicit ClientStartup(ClientImpl& client);
~ClientStartup() final;
ClientStartup(const ClientStartup&) = delete;
ClientStartup& operator=(const ClientStartup&) = delete;
// NetworkStartupInterface interface
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptions& options) final;
void Subscribe(NT_Subscriber subHandle, wpi::span<const std::string> prefixes,
const PubSubOptions& options) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;
private:
ClientImpl& m_client;
BinaryWriter m_binaryWriter;
TextWriter m_textWriter;
};
} // namespace nt::net

View File

@@ -0,0 +1,100 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <optional>
#include <string>
#include <variant>
#include <vector>
#include <wpi/json.h>
#include "PubSubOptions.h"
#include "networktables/NetworkTableValue.h"
#include "ntcore_c.h"
namespace nt::net {
struct PublishMsg {
static constexpr std::string_view kMethodStr = "publish";
NT_Publisher pubHandle{0};
NT_Topic topicHandle{0}; // will be 0 when coming from network
std::string name;
std::string typeStr;
wpi::json properties;
PubSubOptions options; // will be empty when coming from network
};
struct UnpublishMsg {
static constexpr std::string_view kMethodStr = "unpublish";
NT_Publisher pubHandle{0};
NT_Topic topicHandle{0}; // will be 0 when coming from network
};
struct SetPropertiesMsg {
static constexpr std::string_view kMethodStr = "setproperties";
NT_Topic topicHandle{0}; // will be 0 when coming from network
std::string name;
wpi::json update;
};
struct SubscribeMsg {
static constexpr std::string_view kMethodStr = "subscribe";
NT_Subscriber subHandle{0};
std::vector<std::string> topicNames;
PubSubOptions options;
};
struct UnsubscribeMsg {
static constexpr std::string_view kMethodStr = "unsubscribe";
NT_Subscriber subHandle{0};
};
struct ClientValueMsg {
NT_Publisher pubHandle{0};
Value value;
};
struct ClientMessage {
using Contents =
std::variant<std::monostate, PublishMsg, UnpublishMsg, SetPropertiesMsg,
SubscribeMsg, UnsubscribeMsg, ClientValueMsg>;
Contents contents;
};
struct AnnounceMsg {
static constexpr std::string_view kMethodStr = "announce";
std::string name;
int64_t id{0};
std::string typeStr;
std::optional<int64_t> pubuid;
wpi::json properties;
};
struct UnannounceMsg {
static constexpr std::string_view kMethodStr = "unannounce";
std::string name;
int64_t id{0};
};
struct PropertiesUpdateMsg {
static constexpr std::string_view kMethodStr = "properties";
std::string name;
wpi::json update;
bool ack;
};
struct ServerValueMsg {
NT_Topic topic{0};
Value value;
};
struct ServerMessage {
using Contents = std::variant<std::monostate, AnnounceMsg, UnannounceMsg,
PropertiesUpdateMsg, ServerValueMsg>;
Contents contents;
};
} // namespace nt::net

View File

@@ -0,0 +1,68 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <string>
#include <string_view>
#include <wpi/span.h>
#include "ntcore_cpp.h"
namespace wpi {
class json;
} // namespace wpi
namespace nt {
class PubSubOptions;
class Value;
} // namespace nt
namespace nt::net {
class LocalInterface {
public:
virtual ~LocalInterface() = default;
virtual NT_Topic NetworkAnnounce(std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
NT_Publisher pubHandle) = 0;
virtual void NetworkUnannounce(std::string_view name) = 0;
virtual void NetworkPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) = 0;
virtual void NetworkSetValue(NT_Topic topicHandle, const Value& value) = 0;
};
class NetworkStartupInterface {
public:
virtual ~NetworkStartupInterface() = default;
virtual void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptions& options) = 0;
virtual void Subscribe(NT_Subscriber subHandle,
wpi::span<const std::string> topicNames,
const PubSubOptions& options) = 0;
virtual void SetValue(NT_Publisher pubHandle, const Value& value) = 0;
};
class NetworkInterface : public NetworkStartupInterface {
public:
virtual void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) = 0;
virtual void SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update) = 0;
virtual void Unsubscribe(NT_Subscriber subHandle) = 0;
};
class ILocalStorage : public LocalInterface {
public:
virtual void StartNetwork(NetworkStartupInterface& startup) = 0;
virtual void SetNetwork(NetworkInterface* network) = 0;
virtual void ClearNetwork() = 0;
};
} // namespace nt::net

View File

@@ -0,0 +1,54 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "NetworkLoopQueue.h"
#include <wpi/Logger.h>
using namespace nt::net;
static constexpr size_t kMaxSize = 2 * 1024 * 1024;
void NetworkLoopQueue::SetValue(NT_Publisher pubHandle, const Value& value) {
std::scoped_lock lock{m_mutex};
switch (value.type()) {
case NT_STRING:
m_size += value.GetString().size(); // imperfect but good enough
break;
case NT_RAW:
m_size += value.GetRaw().size_bytes();
break;
case NT_BOOLEAN_ARRAY:
m_size += value.GetBooleanArray().size_bytes();
break;
case NT_INTEGER_ARRAY:
m_size += value.GetIntegerArray().size_bytes();
break;
case NT_FLOAT_ARRAY:
m_size += value.GetFloatArray().size_bytes();
break;
case NT_DOUBLE_ARRAY:
m_size += value.GetDoubleArray().size_bytes();
break;
case NT_STRING_ARRAY: {
auto arr = value.GetStringArray();
m_size += arr.size_bytes();
for (auto&& s : arr) {
m_size += s.capacity();
}
break;
}
default:
break;
}
m_size += sizeof(ClientMessage);
if (m_size > kMaxSize) {
if (!m_sizeErrored) {
WPI_ERROR(m_logger, "{}", "NT: dropping value set due to memory limits");
m_sizeErrored = true;
}
return; // avoid potential out of memory
}
m_queue.emplace_back(ClientMessage{ClientValueMsg{pubHandle, value}});
}

View File

@@ -0,0 +1,55 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <string>
#include <vector>
#include <wpi/mutex.h>
#include "Message.h"
#include "NetworkInterface.h"
namespace wpi {
class Logger;
} // namespace wpi
namespace nt::net {
class NetworkLoopQueue : public NetworkInterface {
public:
static constexpr size_t kInitialQueueSize = 2000;
explicit NetworkLoopQueue(wpi::Logger& logger) : m_logger{logger} {
m_queue.reserve(kInitialQueueSize);
}
void ReadQueue(std::vector<ClientMessage>* out);
void ClearQueue();
// NetworkInterface - calls to these append to the queue
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptions& options) final;
void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) final;
void SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update) final;
void Subscribe(NT_Subscriber subHandle,
wpi::span<const std::string> topicNames,
const PubSubOptions& options) final;
void Unsubscribe(NT_Subscriber subHandle) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;
private:
wpi::mutex m_mutex;
std::vector<ClientMessage> m_queue;
wpi::Logger& m_logger;
size_t m_size{0};
bool m_sizeErrored{false};
};
} // namespace nt::net
#include "NetworkLoopQueue.inc"

View File

@@ -0,0 +1,70 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <string>
#include <vector>
#include "NetworkLoopQueue.h"
#include "ntcore_c.h"
namespace nt::net {
inline void NetworkLoopQueue::ReadQueue(std::vector<ClientMessage>* out) {
std::scoped_lock lock{m_mutex};
out->swap(m_queue);
m_queue.resize(0);
m_queue.reserve(out->capacity()); // keep the same running capacity
m_size = 0;
m_sizeErrored = false;
}
inline void NetworkLoopQueue::ClearQueue() {
std::scoped_lock lock{m_mutex};
m_queue.resize(0);
m_size = 0;
m_sizeErrored = false;
}
inline void NetworkLoopQueue::Publish(NT_Publisher pubHandle,
NT_Topic topicHandle,
std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
const PubSubOptions& options) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(
ClientMessage{PublishMsg{pubHandle, topicHandle, std::string{name},
std::string{typeStr}, properties, options}});
}
inline void NetworkLoopQueue::Unpublish(NT_Publisher pubHandle,
NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{UnpublishMsg{pubHandle, topicHandle}});
}
inline void NetworkLoopQueue::SetProperties(NT_Topic topicHandle,
std::string_view name,
const wpi::json& update) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(
ClientMessage{SetPropertiesMsg{topicHandle, std::string{name}, update}});
}
inline void NetworkLoopQueue::Subscribe(NT_Subscriber subHandle,
wpi::span<const std::string> topicNames,
const PubSubOptions& options) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{SubscribeMsg{
subHandle, {topicNames.begin(), topicNames.end()}, options}});
}
inline void NetworkLoopQueue::Unsubscribe(NT_Subscriber subHandle) {
std::scoped_lock lock{m_mutex};
m_queue.emplace_back(ClientMessage{UnsubscribeMsg{subHandle}});
}
} // namespace nt::net

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,94 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <functional>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
#include <wpi/span.h>
#include "NetworkInterface.h"
#include "net3/WireConnection3.h"
namespace wpi {
class Logger;
} // namespace wpi
namespace nt::net3 {
class WireConnection3;
} // namespace nt::net3
namespace nt::net {
struct ClientMessage;
class LocalInterface;
class ServerStartup;
class WireConnection;
class ServerImpl final {
friend class ServerStartup;
public:
using SetPeriodicFunc = std::function<void(uint32_t repeatMs)>;
using Connected3Func =
std::function<void(std::string_view name, uint16_t proto)>;
explicit ServerImpl(wpi::Logger& logger);
~ServerImpl();
void SendControl(uint64_t curTimeMs);
void SendValues(int clientId, uint64_t curTimeMs);
void HandleLocal(wpi::span<const ClientMessage> msgs);
void SetLocal(LocalInterface* local);
void ProcessIncomingText(int clientId, std::string_view data);
void ProcessIncomingBinary(int clientId, wpi::span<const uint8_t> data);
// Returns -1 if cannot add client (e.g. due to duplicate name).
// Caller must ensure WireConnection lifetime lasts until RemoveClient() call.
int AddClient(std::string_view name, std::string_view connInfo, bool local,
WireConnection& wire, SetPeriodicFunc setPeriodic);
int AddClient3(std::string_view connInfo, bool local,
net3::WireConnection3& wire, Connected3Func connected,
SetPeriodicFunc setPeriodic);
void RemoveClient(int clientId);
void ConnectionsChanged(const std::vector<ConnectionInfo>& conns);
// if any persistent values changed since the last call to this function
bool PersistentChanged();
std::string DumpPersistent();
// returns newline-separated errors
std::string LoadPersistent(std::string_view in);
private:
class Impl;
std::unique_ptr<Impl> m_impl;
};
class ServerStartup final : public NetworkStartupInterface {
public:
explicit ServerStartup(ServerImpl& server) : m_server{server} {}
// NetworkStartupInterface interface
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptions& options) final;
void Subscribe(NT_Subscriber subHandle,
wpi::span<const std::string> topicNames,
const PubSubOptions& options) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;
private:
ServerImpl& m_server;
};
} // namespace nt::net

View File

@@ -0,0 +1,122 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "WebSocketConnection.h"
#include <wpi/SpanExtras.h>
#include <wpinet/WebSocket.h>
using namespace nt;
using namespace nt::net;
static constexpr size_t kAllocSize = 4096;
static constexpr size_t kTextFrameRolloverSize = 4096;
static constexpr size_t kBinaryFrameRolloverSize = 8192;
WebSocketConnection::WebSocketConnection(wpi::WebSocket& ws)
: m_ws{ws},
m_text_os{m_text_buffers, [this] { return AllocBuf(); }},
m_binary_os{m_binary_buffers, [this] { return AllocBuf(); }} {}
WebSocketConnection::~WebSocketConnection() {
for (auto&& buf : m_buf_pool) {
buf.Deallocate();
}
}
void WebSocketConnection::Flush() {
FinishSendText();
FinishSendBinary();
if (m_frames.empty()) {
return;
}
// convert internal frames into WS frames
m_ws_frames.clear();
m_ws_frames.reserve(m_frames.size());
for (auto&& frame : m_frames) {
m_ws_frames.emplace_back(frame.opcode,
wpi::span{frame.bufs->begin() + frame.start,
frame.bufs->begin() + frame.end});
}
++m_sendsActive;
m_ws.SendFrames(m_ws_frames, [this](auto bufs, auto) {
m_buf_pool.insert(m_buf_pool.end(), bufs.begin(), bufs.end());
if (m_sendsActive > 0) {
--m_sendsActive;
}
});
m_frames.clear();
m_text_buffers.clear();
m_binary_buffers.clear();
m_text_pos = 0;
m_binary_pos = 0;
}
void WebSocketConnection::Disconnect(std::string_view reason) {
m_ws.Close(1005, reason);
}
void WebSocketConnection::StartSendText() {
// limit amount per single frame
size_t total = 0;
for (size_t i = m_text_pos; i < m_text_buffers.size(); ++i) {
total += m_text_buffers[i].len;
}
if (total >= kTextFrameRolloverSize) {
FinishSendText();
}
if (m_in_text) {
m_text_os << ',';
} else {
m_text_os << '[';
m_in_text = true;
}
}
void WebSocketConnection::FinishSendText() {
if (m_in_text) {
m_text_os << ']';
m_in_text = false;
}
if (m_text_pos >= m_text_buffers.size()) {
return;
}
m_frames.emplace_back(wpi::WebSocket::Frame::kText, &m_text_buffers,
m_text_pos, m_text_buffers.size());
m_text_pos = m_text_buffers.size();
m_text_os.reset();
}
void WebSocketConnection::StartSendBinary() {
// limit amount per single frame
size_t total = 0;
for (size_t i = m_binary_pos; i < m_binary_buffers.size(); ++i) {
total += m_binary_buffers[i].len;
}
if (total >= kBinaryFrameRolloverSize) {
FinishSendBinary();
}
}
void WebSocketConnection::FinishSendBinary() {
if (m_binary_pos >= m_binary_buffers.size()) {
return;
}
m_frames.emplace_back(wpi::WebSocket::Frame::kBinary, &m_binary_buffers,
m_binary_pos, m_binary_buffers.size());
m_binary_pos = m_binary_buffers.size();
m_binary_os.reset();
}
wpi::uv::Buffer WebSocketConnection::AllocBuf() {
if (!m_buf_pool.empty()) {
auto buf = m_buf_pool.back();
m_buf_pool.pop_back();
return buf;
}
return wpi::uv::Buffer::Allocate(kAllocSize);
}

View File

@@ -0,0 +1,66 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <vector>
#include <wpi/SmallVector.h>
#include <wpinet/WebSocket.h>
#include <wpinet/raw_uv_ostream.h>
#include <wpinet/uv/Buffer.h>
#include "WireConnection.h"
namespace nt::net {
class WebSocketConnection final : public WireConnection {
public:
explicit WebSocketConnection(wpi::WebSocket& ws);
~WebSocketConnection() override;
WebSocketConnection(const WebSocketConnection&) = delete;
WebSocketConnection& operator=(const WebSocketConnection&) = delete;
bool Ready() const final { return m_sendsActive == 0; }
TextWriter SendText() final { return {m_text_os, *this}; }
BinaryWriter SendBinary() final { return {m_binary_os, *this}; }
void Flush() final;
void Disconnect(std::string_view reason) final;
private:
void StartSendText() final;
void FinishSendText() final;
void StartSendBinary() final;
void FinishSendBinary() final;
wpi::uv::Buffer AllocBuf();
wpi::WebSocket& m_ws;
// Can't use WS frames directly as span could have dangling pointers
struct Frame {
Frame(uint8_t opcode, wpi::SmallVectorImpl<wpi::uv::Buffer>* bufs,
size_t start, size_t end)
: opcode{opcode}, bufs{bufs}, start{start}, end{end} {}
uint8_t opcode;
wpi::SmallVectorImpl<wpi::uv::Buffer>* bufs;
size_t start;
size_t end;
};
std::vector<Frame> m_frames;
std::vector<wpi::WebSocket::Frame> m_ws_frames; // to reduce allocs
wpi::SmallVector<wpi::uv::Buffer, 4> m_text_buffers;
wpi::SmallVector<wpi::uv::Buffer, 4> m_binary_buffers;
std::vector<wpi::uv::Buffer> m_buf_pool;
wpi::raw_uv_ostream m_text_os;
wpi::raw_uv_ostream m_binary_os;
size_t m_text_pos = 0;
size_t m_binary_pos = 0;
bool m_in_text = false;
int m_sendsActive = 0;
};
} // namespace nt::net

View File

@@ -0,0 +1,110 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <string_view>
#include <wpi/raw_ostream.h>
namespace nt::net {
class BinaryWriter;
class TextWriter;
class WireConnection {
friend class TextWriter;
friend class BinaryWriter;
public:
virtual ~WireConnection() = default;
virtual bool Ready() const = 0;
virtual TextWriter SendText() = 0;
virtual BinaryWriter SendBinary() = 0;
virtual void Flush() = 0;
virtual void Disconnect(std::string_view reason) = 0;
protected:
virtual void StartSendText() = 0;
virtual void FinishSendText() = 0;
virtual void StartSendBinary() = 0;
virtual void FinishSendBinary() = 0;
};
class TextWriter {
public:
TextWriter(wpi::raw_ostream& os, WireConnection& wire)
: m_os{&os}, m_wire{&wire} {}
TextWriter(const TextWriter&) = delete;
TextWriter(TextWriter&& rhs) : m_os{rhs.m_os}, m_wire{rhs.m_wire} {
rhs.m_os = nullptr;
rhs.m_wire = nullptr;
}
TextWriter& operator=(const TextWriter&) = delete;
TextWriter& operator=(TextWriter&& rhs) {
m_os = rhs.m_os;
m_wire = rhs.m_wire;
rhs.m_os = nullptr;
rhs.m_wire = nullptr;
return *this;
}
~TextWriter() {
if (m_os) {
m_wire->FinishSendText();
}
}
wpi::raw_ostream& Add() {
m_wire->StartSendText();
return *m_os;
}
WireConnection& wire() { return *m_wire; }
private:
wpi::raw_ostream* m_os;
WireConnection* m_wire;
};
class BinaryWriter {
public:
BinaryWriter(wpi::raw_ostream& os, WireConnection& wire)
: m_os{&os}, m_wire{&wire} {}
BinaryWriter(const BinaryWriter&) = delete;
BinaryWriter(BinaryWriter&& rhs) : m_os{rhs.m_os}, m_wire{rhs.m_wire} {
rhs.m_os = nullptr;
rhs.m_wire = nullptr;
}
BinaryWriter& operator=(const BinaryWriter&) = delete;
BinaryWriter& operator=(BinaryWriter&& rhs) {
m_os = rhs.m_os;
m_wire = rhs.m_wire;
rhs.m_os = nullptr;
rhs.m_wire = nullptr;
return *this;
}
~BinaryWriter() {
if (m_wire) {
m_wire->FinishSendBinary();
}
}
wpi::raw_ostream& Add() {
m_wire->StartSendBinary();
return *m_os;
}
WireConnection& wire() { return *m_wire; }
private:
wpi::raw_ostream* m_os;
WireConnection* m_wire;
};
} // namespace nt::net

View File

@@ -0,0 +1,564 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "WireDecoder.h"
#include <algorithm>
#include <fmt/format.h>
#include <wpi/Logger.h>
#include <wpi/SpanExtras.h>
#include <wpi/json.h>
#include <wpi/mpack.h>
#include "Message.h"
using namespace nt;
using namespace nt::net;
using namespace mpack;
static bool GetNumber(wpi::json& val, double* num) {
if (auto v = val.get_ptr<const int64_t*>()) {
*num = *v;
} else if (auto v = val.get_ptr<const uint64_t*>()) {
*num = *v;
} else if (auto v = val.get_ptr<const double*>()) {
*num = *v;
} else {
return false;
}
return true;
}
static bool GetNumber(wpi::json& val, int64_t* num) {
if (auto v = val.get_ptr<const int64_t*>()) {
*num = *v;
} else if (auto v = val.get_ptr<const uint64_t*>()) {
*num = *v;
} else {
return false;
}
return true;
}
static std::string* ObjGetString(wpi::json::object_t& obj, std::string_view key,
std::string* error) {
auto it = obj.find(key);
if (it == obj.end()) {
*error = fmt::format("no {} key", key);
return nullptr;
}
auto val = it->second.get_ptr<std::string*>();
if (!val) {
*error = fmt::format("{} must be a string", key);
}
return val;
}
static bool ObjGetNumber(wpi::json::object_t& obj, std::string_view key,
std::string* error, int64_t* num) {
auto it = obj.find(key);
if (it == obj.end()) {
*error = fmt::format("no {} key", key);
return false;
}
if (!GetNumber(it->second, num)) {
*error = fmt::format("{} must be a number", key);
return false;
}
return true;
}
static bool ObjGetStringArray(wpi::json::object_t& obj, std::string_view key,
std::string* error,
std::vector<std::string>* out) {
// prefixes
auto it = obj.find(key);
if (it == obj.end()) {
*error = fmt::format("no {} key", key);
return false;
}
auto jarr = it->second.get_ptr<wpi::json::array_t*>();
if (!jarr) {
*error = fmt::format("{} must be an array", key);
return false;
}
out->resize(0);
out->reserve(jarr->size());
for (auto&& jval : *jarr) {
auto str = jval.get_ptr<std::string*>();
if (!str) {
*error = fmt::format("{}/{} must be a string", key, out->size());
return false;
}
out->emplace_back(std::move(*str));
}
return true;
}
// avoid a fmtlib "unused type alias 'char_type'" warning false positive
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-local-typedef"
#endif
template <typename T>
static void WireDecodeTextImpl(std::string_view in, T& out,
wpi::Logger& logger) {
static_assert(std::is_same_v<T, ClientMessageHandler> ||
std::is_same_v<T, ServerMessageHandler>,
"T must be ClientMessageHandler or ServerMessageHandler");
wpi::json j;
try {
j = wpi::json::parse(in);
} catch (wpi::json::parse_error& err) {
WPI_WARNING(logger, "could not decode JSON message: {}", err.what());
return;
}
if (!j.is_array()) {
WPI_WARNING(logger, "{}", "expected JSON array at top level");
return;
}
int i = -1;
for (auto&& jmsg : j) {
++i;
std::string error;
{
auto obj = jmsg.get_ptr<wpi::json::object_t*>();
if (!obj) {
error = "expected message to be an object";
goto err;
}
auto method = ObjGetString(*obj, "method", &error);
if (!method) {
goto err;
}
auto paramsIt = obj->find("params");
if (paramsIt == obj->end()) {
error = "no params key";
goto err;
}
auto params = paramsIt->second.get_ptr<wpi::json::object_t*>();
if (!params) {
error = "params must be an object";
goto err;
}
if constexpr (std::is_same_v<T, ClientMessageHandler>) {
if (*method == PublishMsg::kMethodStr) {
// name
auto name = ObjGetString(*params, "name", &error);
if (!name) {
goto err;
}
// type
auto typeStr = ObjGetString(*params, "type", &error);
if (!typeStr) {
goto err;
}
// pubuid
int64_t pubuid;
if (!ObjGetNumber(*params, "pubuid", &error, &pubuid)) {
goto err;
}
// properties; allow missing (treated as empty)
wpi::json* properties = nullptr;
auto propertiesIt = params->find("properties");
if (propertiesIt != params->end()) {
properties = &propertiesIt->second;
if (!properties->is_object()) {
error = "properties must be an object";
goto err;
}
}
wpi::json propertiesEmpty;
if (!properties) {
propertiesEmpty = wpi::json::object();
properties = &propertiesEmpty;
}
// complete
out.ClientPublish(pubuid, *name, *typeStr, *properties);
} else if (*method == UnpublishMsg::kMethodStr) {
// pubuid
int64_t pubuid;
if (!ObjGetNumber(*params, "pubuid", &error, &pubuid)) {
goto err;
}
// complete
out.ClientUnpublish(pubuid);
} else if (*method == SetPropertiesMsg::kMethodStr) {
// name
auto name = ObjGetString(*params, "name", &error);
if (!name) {
goto err;
}
// update
auto updateIt = params->find("update");
if (updateIt == params->end()) {
error = "no update key";
goto err;
}
auto update = &updateIt->second;
if (!update->is_object()) {
error = "update must be an object";
goto err;
}
// complete
out.ClientSetProperties(*name, *update);
} else if (*method == SubscribeMsg::kMethodStr) {
// subuid
int64_t subuid;
if (!ObjGetNumber(*params, "subuid", &error, &subuid)) {
goto err;
}
// options
PubSubOptions options;
auto optionsIt = params->find("options");
if (optionsIt != params->end()) {
auto joptions = optionsIt->second.get_ptr<wpi::json::object_t*>();
if (!joptions) {
error = "options must be an object";
goto err;
}
// periodic
auto periodicIt = joptions->find("periodic");
if (periodicIt != joptions->end()) {
if (!GetNumber(periodicIt->second, &options.periodic)) {
error = "periodic value must be a number";
goto err;
}
}
// send all changes
auto sendAllIt = joptions->find("all");
if (sendAllIt != joptions->end()) {
auto sendAll = sendAllIt->second.get_ptr<bool*>();
if (!sendAll) {
error = "all value must be a boolean";
goto err;
}
options.sendAll = *sendAll;
}
// topics only
auto topicsOnlyIt = joptions->find("topicsonly");
if (topicsOnlyIt != joptions->end()) {
auto topicsOnly = topicsOnlyIt->second.get_ptr<bool*>();
if (!topicsOnly) {
error = "topicsonly value must be a boolean";
goto err;
}
options.topicsOnly = *topicsOnly;
}
// prefix match
auto prefixMatchIt = joptions->find("prefix");
if (prefixMatchIt != joptions->end()) {
auto prefixMatch = prefixMatchIt->second.get_ptr<bool*>();
if (!prefixMatch) {
error = "prefix value must be a boolean";
goto err;
}
options.prefixMatch = *prefixMatch;
}
}
// topic names
std::vector<std::string> topicNames;
if (!ObjGetStringArray(*params, "topics", &error, &topicNames)) {
goto err;
}
// complete
out.ClientSubscribe(subuid, topicNames, options);
} else if (*method == UnsubscribeMsg::kMethodStr) {
// subuid
int64_t subuid;
if (!ObjGetNumber(*params, "subuid", &error, &subuid)) {
goto err;
}
// complete
out.ClientUnsubscribe(subuid);
} else {
error = fmt::format("unrecognized method '{}'", *method);
goto err;
}
} else if constexpr (std::is_same_v<T, ServerMessageHandler>) {
if (*method == AnnounceMsg::kMethodStr) {
// name
auto name = ObjGetString(*params, "name", &error);
if (!name) {
goto err;
}
// id
int64_t id;
if (!ObjGetNumber(*params, "id", &error, &id)) {
goto err;
}
// type
auto typeStr = ObjGetString(*params, "type", &error);
if (!typeStr) {
goto err;
}
// pubuid
std::optional<int64_t> pubuid;
auto pubuidIt = params->find("pubuid");
if (pubuidIt != params->end()) {
int64_t val;
if (!GetNumber(pubuidIt->second, &val)) {
error = "pubuid value must be a number";
goto err;
}
pubuid = val;
}
// properties
auto propertiesIt = params->find("properties");
if (propertiesIt == params->end()) {
error = "no properties key";
goto err;
}
auto properties = &propertiesIt->second;
if (!properties->is_object()) {
WPI_WARNING(logger, "{}: properties is not an object", *name);
*properties = wpi::json::object();
}
// complete
out.ServerAnnounce(*name, id, *typeStr, *properties, pubuid);
} else if (*method == UnannounceMsg::kMethodStr) {
// name
auto name = ObjGetString(*params, "name", &error);
if (!name) {
goto err;
}
// id
int64_t id;
if (!ObjGetNumber(*params, "id", &error, &id)) {
goto err;
}
// complete
out.ServerUnannounce(*name, id);
} else if (*method == PropertiesUpdateMsg::kMethodStr) {
// name
auto name = ObjGetString(*params, "name", &error);
if (!name) {
goto err;
}
// update
auto updateIt = params->find("update");
if (updateIt == params->end()) {
error = "no update key";
goto err;
}
auto update = &updateIt->second;
if (!update->is_object()) {
error = "update must be an object";
goto err;
}
bool ack = false;
auto ackIt = params->find("ack");
if (ackIt != params->end()) {
auto val = ackIt->second.get_ptr<bool*>();
if (!val) {
error = "ack must be a boolean";
goto err;
}
ack = *val;
}
// complete
out.ServerPropertiesUpdate(*name, *update, ack);
} else {
error = fmt::format("unrecognized method '{}'", *method);
goto err;
}
}
continue;
}
err:
WPI_WARNING(logger, "{}: {}", i, error);
}
}
#ifdef __clang__
#pragma clang diagnostic pop
#endif
void nt::net::WireDecodeText(std::string_view in, ClientMessageHandler& out,
wpi::Logger& logger) {
::WireDecodeTextImpl(in, out, logger);
}
void nt::net::WireDecodeText(std::string_view in, ServerMessageHandler& out,
wpi::Logger& logger) {
::WireDecodeTextImpl(in, out, logger);
}
bool nt::net::WireDecodeBinary(wpi::span<const uint8_t>* in, int64_t* outId,
Value* outValue, std::string* error,
int64_t localTimeOffset) {
mpack_reader_t reader;
mpack_reader_init_data(&reader, reinterpret_cast<const char*>(in->data()),
in->size());
mpack_expect_array_match(&reader, 4);
*outId = mpack_expect_i64(&reader);
auto time = mpack_expect_i64(&reader);
int type = mpack_expect_int(&reader);
switch (type) {
case 0: // boolean
*outValue = Value::MakeBoolean(mpack_expect_bool(&reader), 1);
break;
case 2: // integer
*outValue = Value::MakeInteger(mpack_expect_i64(&reader), 1);
break;
case 3: // float
*outValue = Value::MakeFloat(mpack_expect_float(&reader), 1);
break;
case 1: // double
*outValue = Value::MakeDouble(mpack_expect_double(&reader), 1);
break;
case 4: { // string
auto length = mpack_expect_str(&reader);
auto data = mpack_read_bytes_inplace(&reader, length);
if (mpack_reader_error(&reader) == mpack_ok) {
*outValue = Value::MakeString({data, length}, 1);
}
mpack_done_str(&reader);
break;
}
case 5: { // raw
auto length = mpack_expect_bin(&reader);
auto data = mpack_read_bytes_inplace(&reader, length);
if (mpack_reader_error(&reader) == mpack_ok) {
*outValue =
Value::MakeRaw({reinterpret_cast<const uint8_t*>(data), length}, 1);
}
mpack_done_bin(&reader);
break;
}
case 16: { // boolean array
auto length = mpack_expect_array(&reader);
std::vector<int> arr;
arr.reserve((std::min)(length, 1000u));
for (uint32_t i = 0; i < length; ++i) {
arr.emplace_back(mpack_expect_bool(&reader));
if (mpack_reader_error(&reader) != mpack_ok) {
break;
}
}
if (mpack_reader_error(&reader) == mpack_ok) {
*outValue = Value::MakeBooleanArray(std::move(arr), 1);
}
mpack_done_array(&reader);
break;
}
case 18: { // integer array
auto length = mpack_expect_array(&reader);
std::vector<int64_t> arr;
arr.reserve((std::min)(length, 1000u));
for (uint32_t i = 0; i < length; ++i) {
arr.emplace_back(mpack_expect_i64(&reader));
if (mpack_reader_error(&reader) != mpack_ok) {
break;
}
}
if (mpack_reader_error(&reader) == mpack_ok) {
*outValue = Value::MakeIntegerArray(std::move(arr), 1);
}
mpack_done_array(&reader);
break;
}
case 19: { // float array
auto length = mpack_expect_array(&reader);
std::vector<float> arr;
arr.reserve((std::min)(length, 1000u));
for (uint32_t i = 0; i < length; ++i) {
arr.emplace_back(mpack_expect_float(&reader));
if (mpack_reader_error(&reader) != mpack_ok) {
break;
}
}
if (mpack_reader_error(&reader) == mpack_ok) {
*outValue = Value::MakeFloatArray(std::move(arr), 1);
}
mpack_done_array(&reader);
break;
}
case 17: { // double array
auto length = mpack_expect_array(&reader);
std::vector<double> arr;
arr.reserve((std::min)(length, 1000u));
for (uint32_t i = 0; i < length; ++i) {
arr.emplace_back(mpack_expect_double(&reader));
if (mpack_reader_error(&reader) != mpack_ok) {
break;
}
}
if (mpack_reader_error(&reader) == mpack_ok) {
*outValue = Value::MakeDoubleArray(std::move(arr), 1);
}
mpack_done_array(&reader);
break;
}
case 20: { // string array
auto length = mpack_expect_array(&reader);
std::vector<std::string> arr;
arr.reserve((std::min)(length, 1000u));
for (uint32_t i = 0; i < length; ++i) {
auto length = mpack_expect_str(&reader);
auto data = mpack_read_bytes_inplace(&reader, length);
if (mpack_reader_error(&reader) == mpack_ok) {
arr.emplace_back(std::string{data, length});
} else {
break;
}
mpack_done_str(&reader);
}
if (mpack_reader_error(&reader) == mpack_ok) {
*outValue = Value::MakeStringArray(std::move(arr), 1);
}
mpack_done_array(&reader);
break;
}
default:
*error = fmt::format("unrecognized type {}", type);
return false;
}
mpack_done_array(&reader);
auto err = mpack_reader_destroy(&reader);
if (err != mpack_ok) {
*error = mpack_error_to_string(err);
return false;
}
// set time
outValue->SetServerTime(time);
outValue->SetTime(time == 0 ? 0 : time + localTimeOffset);
// update input range
*in = wpi::drop_front(*in,
in->size() - mpack_reader_remaining(&reader, nullptr));
return true;
}

View File

@@ -0,0 +1,65 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <stdint.h>
#include <optional>
#include <string>
#include <string_view>
#include <wpi/span.h>
namespace wpi {
class Logger;
class json;
} // namespace wpi
namespace nt {
class PubSubOptions;
class Value;
} // namespace nt
namespace nt::net {
class ClientMessageHandler {
public:
virtual ~ClientMessageHandler() = default;
virtual void ClientPublish(int64_t pubuid, std::string_view name,
std::string_view typeStr,
const wpi::json& properties) = 0;
virtual void ClientUnpublish(int64_t pubuid) = 0;
virtual void ClientSetProperties(std::string_view name,
const wpi::json& update) = 0;
virtual void ClientSubscribe(int64_t subuid,
wpi::span<const std::string> topicNames,
const PubSubOptions& options) = 0;
virtual void ClientUnsubscribe(int64_t subuid) = 0;
};
class ServerMessageHandler {
public:
virtual ~ServerMessageHandler() = default;
virtual void ServerAnnounce(std::string_view name, int64_t id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubuid) = 0;
virtual void ServerUnannounce(std::string_view name, int64_t id) = 0;
virtual void ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) = 0;
};
void WireDecodeText(std::string_view in, ClientMessageHandler& out,
wpi::Logger& logger);
void WireDecodeText(std::string_view in, ServerMessageHandler& out,
wpi::Logger& logger);
// returns true if successfully decoded a message
bool WireDecodeBinary(wpi::span<const uint8_t>* in, int64_t* outId,
Value* outValue, std::string* error,
int64_t localTimeOffset);
} // namespace nt::net

View File

@@ -0,0 +1,316 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "WireEncoder.h"
#include <optional>
#include <wpi/json_serializer.h>
#include <wpi/mpack.h>
#include <wpi/raw_ostream.h>
#include "Handle.h"
#include "Message.h"
#include "PubSubOptions.h"
#include "networktables/NetworkTableValue.h"
using namespace nt;
using namespace nt::net;
using namespace mpack;
void nt::net::WireEncodePublish(wpi::raw_ostream& os, int64_t pubuid,
std::string_view name, std::string_view typeStr,
const wpi::json& properties) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << PublishMsg::kMethodStr << "\",\"params\":{";
os << "\"name\":\"";
s.dump_escaped(name, false);
os << "\",\"properties\":";
s.dump(properties, false, false, 0, 0);
os << ",\"pubuid\":";
s.dump_integer(pubuid);
os << ",\"type\":\"";
s.dump_escaped(typeStr, false);
os << "\"}}";
}
void nt::net::WireEncodeUnpublish(wpi::raw_ostream& os, int64_t pubuid) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << UnpublishMsg::kMethodStr << "\",\"params\":{";
os << "\"pubuid\":";
s.dump_integer(pubuid);
os << "}}";
}
void nt::net::WireEncodeSetProperties(wpi::raw_ostream& os,
std::string_view name,
const wpi::json& update) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << SetPropertiesMsg::kMethodStr << "\",\"params\":{";
os << "\"name\":\"";
s.dump_escaped(name, false);
os << "\",\"update\":";
s.dump(update, false, false, 0, 0);
os << "}}";
}
template <typename T>
static void EncodePrefixes(wpi::raw_ostream& os, wpi::span<const T> topicNames,
wpi::json::serializer& s) {
os << '[';
bool first = true;
for (auto&& name : topicNames) {
if (first) {
first = false;
} else {
os << ',';
}
os << '"';
s.dump_escaped(name, false);
os << '"';
}
os << ']';
}
template <typename T>
static void WireEncodeSubscribeImpl(wpi::raw_ostream& os, int64_t subuid,
wpi::span<const T> topicNames,
const PubSubOptions& options) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << SubscribeMsg::kMethodStr << "\",\"params\":{";
os << "\"options\":{";
bool first = true;
if (options.sendAll) {
os << "\"all\":true";
first = false;
}
if (options.topicsOnly) {
if (!first) {
os << ',';
}
os << "\"topicsonly\":true";
first = false;
}
if (options.prefixMatch) {
if (!first) {
os << ',';
}
os << "\"prefix\":true";
first = false;
}
if (options.periodic != 0.1) {
if (!first) {
os << ',';
}
os << "\"periodic\":";
s.dump_float(options.periodic);
}
os << "},\"topics\":";
EncodePrefixes(os, topicNames, s);
os << ",\"subuid\":";
s.dump_integer(subuid);
os << "}}";
}
void nt::net::WireEncodeSubscribe(wpi::raw_ostream& os, int64_t subuid,
wpi::span<const std::string_view> topicNames,
const PubSubOptions& options) {
WireEncodeSubscribeImpl(os, subuid, topicNames, options);
}
void nt::net::WireEncodeSubscribe(wpi::raw_ostream& os, int64_t subuid,
wpi::span<const std::string> topicNames,
const PubSubOptions& options) {
WireEncodeSubscribeImpl(os, subuid, topicNames, options);
}
void nt::net::WireEncodeUnsubscribe(wpi::raw_ostream& os, int64_t subHandle) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << UnsubscribeMsg::kMethodStr << "\",\"params\":{";
os << "\"subuid\":";
s.dump_integer(subHandle);
os << "}}";
}
bool nt::net::WireEncodeText(wpi::raw_ostream& os, const ClientMessage& msg) {
if (auto m = std::get_if<PublishMsg>(&msg.contents)) {
WireEncodePublish(os, Handle{m->pubHandle}.GetIndex(), m->name, m->typeStr,
m->properties);
} else if (auto m = std::get_if<UnpublishMsg>(&msg.contents)) {
WireEncodeUnpublish(os, Handle{m->pubHandle}.GetIndex());
} else if (auto m = std::get_if<SetPropertiesMsg>(&msg.contents)) {
WireEncodeSetProperties(os, m->name, m->update);
} else if (auto m = std::get_if<SubscribeMsg>(&msg.contents)) {
WireEncodeSubscribe(os, Handle{m->subHandle}.GetIndex(), m->topicNames,
m->options);
} else if (auto m = std::get_if<UnsubscribeMsg>(&msg.contents)) {
WireEncodeUnsubscribe(os, Handle{m->subHandle}.GetIndex());
} else {
return false;
}
return true;
}
void nt::net::WireEncodeAnnounce(wpi::raw_ostream& os, std::string_view name,
int64_t id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubHandle) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << AnnounceMsg::kMethodStr << "\",\"params\":{";
os << "\"id\":";
s.dump_integer(id);
os << ",\"name\":\"";
s.dump_escaped(name, false);
os << "\",\"properties\":";
s.dump(properties, false, false, 0, 0);
if (pubHandle) {
os << ",\"pubuid\":";
s.dump_integer(*pubHandle);
}
os << ",\"type\":\"";
s.dump_escaped(typeStr, false);
os << "\"}}";
}
void nt::net::WireEncodeUnannounce(wpi::raw_ostream& os, std::string_view name,
int64_t id) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << UnannounceMsg::kMethodStr << "\",\"params\":{";
os << "\"id\":";
s.dump_integer(id);
os << ",\"name\":\"";
s.dump_escaped(name, false);
os << "\"}}";
}
void nt::net::WireEncodePropertiesUpdate(wpi::raw_ostream& os,
std::string_view name,
const wpi::json& update, bool ack) {
wpi::json::serializer s{os, ' ', 0};
os << "{\"method\":\"" << PropertiesUpdateMsg::kMethodStr
<< "\",\"params\":{";
os << "\"name\":\"";
s.dump_escaped(name, false);
os << "\",\"update\":";
s.dump(update, false, false, 0, 0);
if (ack) {
os << ",\"ack\":true";
}
os << "}}";
}
bool nt::net::WireEncodeText(wpi::raw_ostream& os, const ServerMessage& msg) {
if (auto m = std::get_if<AnnounceMsg>(&msg.contents)) {
WireEncodeAnnounce(os, m->name, m->id, m->typeStr, m->properties,
m->pubuid);
} else if (auto m = std::get_if<UnannounceMsg>(&msg.contents)) {
WireEncodeUnannounce(os, m->name, m->id);
} else if (auto m = std::get_if<PropertiesUpdateMsg>(&msg.contents)) {
WireEncodePropertiesUpdate(os, m->name, m->update, m->ack);
} else {
return false;
}
return true;
}
bool nt::net::WireEncodeBinary(wpi::raw_ostream& os, int64_t id, int64_t time,
const Value& value) {
char buf[128];
mpack_writer_t writer;
mpack_writer_init(&writer, buf, sizeof(buf));
mpack_writer_set_context(&writer, &os);
mpack_writer_set_flush(
&writer, [](mpack_writer_t* writer, const char* buffer, size_t count) {
static_cast<wpi::raw_ostream*>(writer->context)->write(buffer, count);
});
mpack_start_array(&writer, 4);
mpack_write_int(&writer, id);
mpack_write_int(&writer, time);
switch (value.type()) {
case NT_BOOLEAN:
mpack_write_u8(&writer, 0);
mpack_write_bool(&writer, value.GetBoolean());
break;
case NT_INTEGER:
mpack_write_u8(&writer, 2);
mpack_write_int(&writer, value.GetInteger());
break;
case NT_FLOAT:
mpack_write_u8(&writer, 3);
mpack_write_float(&writer, value.GetFloat());
break;
case NT_DOUBLE:
mpack_write_u8(&writer, 1);
mpack_write_double(&writer, value.GetDouble());
break;
case NT_STRING: {
auto v = value.GetString();
mpack_write_u8(&writer, 4);
mpack_write_str(&writer, v.data(), v.size());
break;
}
case NT_RPC:
case NT_RAW: {
auto v = value.GetRaw();
mpack_write_u8(&writer, 5);
mpack_write_bin(&writer, reinterpret_cast<const char*>(v.data()),
v.size());
break;
}
case NT_BOOLEAN_ARRAY: {
auto v = value.GetBooleanArray();
mpack_write_u8(&writer, 16);
mpack_start_array(&writer, v.size());
for (auto val : v) {
mpack_write_bool(&writer, val);
}
mpack_finish_array(&writer);
break;
}
case NT_INTEGER_ARRAY: {
auto v = value.GetIntegerArray();
mpack_write_u8(&writer, 18);
mpack_start_array(&writer, v.size());
for (auto val : v) {
mpack_write_int(&writer, val);
}
mpack_finish_array(&writer);
break;
}
case NT_FLOAT_ARRAY: {
auto v = value.GetFloatArray();
mpack_write_u8(&writer, 19);
mpack_start_array(&writer, v.size());
for (auto val : v) {
mpack_write_float(&writer, val);
}
mpack_finish_array(&writer);
break;
}
case NT_DOUBLE_ARRAY: {
auto v = value.GetDoubleArray();
mpack_write_u8(&writer, 17);
mpack_start_array(&writer, v.size());
for (auto val : v) {
mpack_write_double(&writer, val);
}
mpack_finish_array(&writer);
break;
}
case NT_STRING_ARRAY: {
auto v = value.GetStringArray();
mpack_write_u8(&writer, 20);
mpack_start_array(&writer, v.size());
for (auto&& val : v) {
mpack_write_str(&writer, val.data(), val.size());
}
mpack_finish_array(&writer);
break;
}
default:
return false;
}
mpack_finish_array(&writer);
return mpack_writer_destroy(&writer) == mpack_ok;
}

View File

@@ -0,0 +1,62 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#pragma once
#include <optional>
#include <string>
#include <string_view>
#include <wpi/span.h>
namespace wpi {
class json;
class raw_ostream;
} // namespace wpi
namespace nt {
class PubSubOptions;
class Value;
} // namespace nt
namespace nt::net {
struct ClientMessage;
struct ServerMessage;
// encoders for client text messages (avoids need to construct a Message struct)
void WireEncodePublish(wpi::raw_ostream& os, int64_t pubuid,
std::string_view name, std::string_view typeStr,
const wpi::json& properties);
void WireEncodeUnpublish(wpi::raw_ostream& os, int64_t pubuid);
void WireEncodeSetProperties(wpi::raw_ostream& os, std::string_view name,
const wpi::json& update);
void WireEncodeSubscribe(wpi::raw_ostream& os, int64_t subuid,
wpi::span<const std::string_view> topicNames,
const PubSubOptions& options);
void WireEncodeSubscribe(wpi::raw_ostream& os, int64_t subuid,
wpi::span<const std::string> topicNames,
const PubSubOptions& options);
void WireEncodeUnsubscribe(wpi::raw_ostream& os, int64_t subuid);
// encoders for server text messages (avoids need to construct a Message struct)
void WireEncodeAnnounce(wpi::raw_ostream& os, std::string_view name, int64_t id,
std::string_view typeStr, const wpi::json& properties,
std::optional<int64_t> pubuid);
void WireEncodeUnannounce(wpi::raw_ostream& os, std::string_view name,
int64_t id);
void WireEncodePropertiesUpdate(wpi::raw_ostream& os, std::string_view name,
const wpi::json& update, bool ack);
// Encode a single message; note text messages must be put into a
// JSON array "[msg1, msg2]" for transmission.
// Returns true if message was written
bool WireEncodeText(wpi::raw_ostream& os, const ClientMessage& msg);
bool WireEncodeText(wpi::raw_ostream& os, const ServerMessage& msg);
// encoder for binary messages
bool WireEncodeBinary(wpi::raw_ostream& os, int64_t id, int64_t time,
const Value& value);
} // namespace nt::net