mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-27 02:01:42 +00:00
[ntcore] Remove NT3 support (#7625)
- Remove StartClient3 - Rename StartClient4 to StartClient - Remove port3 parameter from StartServer - Remove 3-suffix constants - Remove 4 suffix from constants Also remove Shuffleboard build from CI.
This commit is contained in:
@@ -102,14 +102,14 @@ void InstanceImpl::StopLocal() {
|
||||
|
||||
void InstanceImpl::StartServer(std::string_view persistFilename,
|
||||
std::string_view listenAddress,
|
||||
unsigned int port3, unsigned int port4) {
|
||||
unsigned int port) {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
if (networkMode != NT_NET_MODE_NONE) {
|
||||
return;
|
||||
}
|
||||
m_networkServer = std::make_shared<NetworkServer>(
|
||||
persistFilename, listenAddress, port3, port4, localStorage,
|
||||
connectionList, logger, [this] {
|
||||
persistFilename, listenAddress, port, localStorage, connectionList,
|
||||
logger, [this] {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
networkMode &= ~NT_NET_MODE_STARTING;
|
||||
});
|
||||
@@ -134,20 +134,7 @@ void InstanceImpl::StopServer() {
|
||||
}
|
||||
}
|
||||
|
||||
void InstanceImpl::StartClient3(std::string_view identity) {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
if (networkMode != NT_NET_MODE_NONE) {
|
||||
return;
|
||||
}
|
||||
m_networkClient = std::make_shared<NetworkClient3>(
|
||||
m_inst, identity, localStorage, connectionList, logger);
|
||||
if (!m_servers.empty()) {
|
||||
m_networkClient->SetServers(m_servers);
|
||||
}
|
||||
networkMode = NT_NET_MODE_CLIENT3;
|
||||
}
|
||||
|
||||
void InstanceImpl::StartClient4(std::string_view identity) {
|
||||
void InstanceImpl::StartClient(std::string_view identity) {
|
||||
std::scoped_lock lock{m_mutex};
|
||||
if (networkMode != NT_NET_MODE_NONE) {
|
||||
return;
|
||||
@@ -169,14 +156,14 @@ void InstanceImpl::StartClient4(std::string_view identity) {
|
||||
if (!m_servers.empty()) {
|
||||
m_networkClient->SetServers(m_servers);
|
||||
}
|
||||
networkMode = NT_NET_MODE_CLIENT4;
|
||||
networkMode = NT_NET_MODE_CLIENT;
|
||||
}
|
||||
|
||||
void InstanceImpl::StopClient() {
|
||||
std::shared_ptr<INetworkClient> client;
|
||||
{
|
||||
std::scoped_lock lock{m_mutex};
|
||||
if ((networkMode & (NT_NET_MODE_CLIENT3 | NT_NET_MODE_CLIENT4)) == 0) {
|
||||
if ((networkMode & NT_NET_MODE_CLIENT) == 0) {
|
||||
return;
|
||||
}
|
||||
client = std::move(m_networkClient);
|
||||
|
||||
@@ -45,11 +45,9 @@ class InstanceImpl {
|
||||
void StartLocal();
|
||||
void StopLocal();
|
||||
void StartServer(std::string_view persistFilename,
|
||||
std::string_view listenAddress, unsigned int port3,
|
||||
unsigned int port4);
|
||||
std::string_view listenAddress, unsigned int port);
|
||||
void StopServer();
|
||||
void StartClient3(std::string_view identity);
|
||||
void StartClient4(std::string_view identity);
|
||||
void StartClient(std::string_view identity);
|
||||
void StopClient();
|
||||
void SetServers(
|
||||
std::span<const std::pair<std::string, unsigned int>> servers);
|
||||
|
||||
@@ -61,7 +61,7 @@ void NetworkClientBase::StartDSClient(unsigned int port) {
|
||||
if (m_dsClient) {
|
||||
return;
|
||||
}
|
||||
m_dsClientServer.second = port == 0 ? NT_DEFAULT_PORT4 : port;
|
||||
m_dsClientServer.second = port == 0 ? NT_DEFAULT_PORT : port;
|
||||
m_dsClient = wpi::DsClient::Create(m_loop, m_logger);
|
||||
if (m_dsClient) {
|
||||
m_dsClient->setIp.connect([this](std::string_view ip) {
|
||||
@@ -142,161 +142,6 @@ void NetworkClientBase::DoDisconnect(std::string_view reason) {
|
||||
});
|
||||
}
|
||||
|
||||
NetworkClient3::NetworkClient3(int inst, std::string_view id,
|
||||
net::ILocalStorage& localStorage,
|
||||
IConnectionList& connList, wpi::Logger& logger)
|
||||
: NetworkClientBase{inst, id, localStorage, connList, logger} {
|
||||
m_loopRunner.ExecAsync([this](uv::Loop& loop) {
|
||||
m_parallelConnect = wpi::ParallelTcpConnector::Create(
|
||||
loop, kReconnectRate, m_logger,
|
||||
[this](uv::Tcp& tcp) { TcpConnected(tcp); }, true);
|
||||
|
||||
m_sendOutgoingTimer = uv::Timer::Create(loop);
|
||||
if (m_sendOutgoingTimer) {
|
||||
m_sendOutgoingTimer->timeout.connect([this] {
|
||||
if (m_clientImpl) {
|
||||
HandleLocal();
|
||||
m_clientImpl->SendPeriodic(m_loop.Now().count(), false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// set up flush async
|
||||
m_flush = uv::Async<>::Create(m_loop);
|
||||
if (m_flush) {
|
||||
m_flush->wakeup.connect([this] {
|
||||
if (m_clientImpl) {
|
||||
HandleLocal();
|
||||
m_clientImpl->SendPeriodic(m_loop.Now().count(), true);
|
||||
}
|
||||
});
|
||||
}
|
||||
m_flushAtomic = m_flush.get();
|
||||
|
||||
m_flushLocal = uv::Async<>::Create(m_loop);
|
||||
if (m_flushLocal) {
|
||||
m_flushLocal->wakeup.connect([this] { HandleLocal(); });
|
||||
}
|
||||
m_flushLocalAtomic = m_flushLocal.get();
|
||||
});
|
||||
}
|
||||
|
||||
NetworkClient3::~NetworkClient3() {
|
||||
// must explicitly destroy these on loop
|
||||
m_loopRunner.ExecSync([&](auto&) {
|
||||
m_clientImpl.reset();
|
||||
m_wire.reset();
|
||||
});
|
||||
// shut down loop here to avoid race
|
||||
m_loopRunner.Stop();
|
||||
}
|
||||
|
||||
void NetworkClient3::HandleLocal() {
|
||||
for (;;) {
|
||||
auto msgs = m_localQueue.ReadQueue(m_localMsgs);
|
||||
if (msgs.empty()) {
|
||||
return;
|
||||
}
|
||||
if (m_clientImpl) {
|
||||
m_clientImpl->HandleLocal(msgs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkClient3::TcpConnected(uv::Tcp& tcp) {
|
||||
tcp.SetNoDelay(true);
|
||||
|
||||
// create as shared_ptr and capture in lambda because there may be multiple
|
||||
// simultaneous attempts
|
||||
auto wire = std::make_shared<net3::UvStreamConnection3>(tcp);
|
||||
auto clientImpl = std::make_shared<net3::ClientImpl3>(
|
||||
m_loop.Now().count(), m_inst, *wire, m_logger, [this](uint32_t repeatMs) {
|
||||
DEBUG4("Setting periodic timer to {}", repeatMs);
|
||||
if (m_sendOutgoingTimer &&
|
||||
(!m_sendOutgoingTimer->IsActive() ||
|
||||
uv::Timer::Time{repeatMs} != m_sendOutgoingTimer->GetRepeat())) {
|
||||
m_sendOutgoingTimer->Start(uv::Timer::Time{repeatMs},
|
||||
uv::Timer::Time{repeatMs});
|
||||
}
|
||||
});
|
||||
clientImpl->Start(
|
||||
m_id, [this, wire,
|
||||
clientWeak = std::weak_ptr<net3::ClientImpl3>{clientImpl}, &tcp] {
|
||||
auto clientImpl = clientWeak.lock();
|
||||
if (!clientImpl) {
|
||||
return;
|
||||
}
|
||||
if (m_connList.IsConnected()) {
|
||||
tcp.Close(); // no longer needed
|
||||
return;
|
||||
}
|
||||
|
||||
if (m_parallelConnect) {
|
||||
m_parallelConnect->Succeeded(tcp);
|
||||
}
|
||||
|
||||
m_wire = std::move(wire);
|
||||
m_clientImpl = std::move(clientImpl);
|
||||
|
||||
ConnectionInfo connInfo;
|
||||
uv::AddrToName(tcp.GetPeer(), &connInfo.remote_ip,
|
||||
&connInfo.remote_port);
|
||||
connInfo.protocol_version = 0x0300;
|
||||
|
||||
INFO("CONNECTED NT3 to {} port {}", connInfo.remote_ip,
|
||||
connInfo.remote_port);
|
||||
m_connHandle = m_connList.AddConnection(connInfo);
|
||||
|
||||
tcp.error.connect([this, &tcp](uv::Error err) {
|
||||
DEBUG3("NT3 TCP error {}", err.str());
|
||||
if (!tcp.IsLoopClosing()) {
|
||||
// we could be in the middle of sending data, so defer disconnect
|
||||
uv::Timer::SingleShot(m_loop, uv::Timer::Time{0},
|
||||
[this, reason = std::string{err.str()}] {
|
||||
DoDisconnect(reason);
|
||||
});
|
||||
}
|
||||
});
|
||||
tcp.end.connect([this, &tcp] {
|
||||
DEBUG3("NT3 TCP read ended");
|
||||
if (!tcp.IsLoopClosing()) {
|
||||
DoDisconnect("remote end closed connection");
|
||||
}
|
||||
});
|
||||
tcp.closed.connect([this, &tcp] {
|
||||
DEBUG3("NT3 TCP connection closed");
|
||||
if (!tcp.IsLoopClosing()) {
|
||||
DoDisconnect(m_wire ? m_wire->GetDisconnectReason() : "unknown");
|
||||
}
|
||||
});
|
||||
|
||||
m_clientImpl->SetLocal(&m_localStorage);
|
||||
m_localStorage.StartNetwork(&m_localQueue);
|
||||
HandleLocal();
|
||||
});
|
||||
|
||||
tcp.SetData(clientImpl);
|
||||
tcp.data.connect(
|
||||
[clientImpl = clientImpl.get()](uv::Buffer& buf, size_t len) {
|
||||
clientImpl->ProcessIncoming(
|
||||
{reinterpret_cast<const uint8_t*>(buf.base), len});
|
||||
});
|
||||
tcp.StartRead();
|
||||
}
|
||||
|
||||
void NetworkClient3::ForceDisconnect(std::string_view reason) {
|
||||
if (m_wire) {
|
||||
m_wire->Disconnect(reason);
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkClient3::DoDisconnect(std::string_view reason) {
|
||||
INFO("DISCONNECTED NT3 connection: {}", reason);
|
||||
m_clientImpl.reset();
|
||||
m_wire.reset();
|
||||
NetworkClientBase::DoDisconnect(reason);
|
||||
}
|
||||
|
||||
NetworkClient::NetworkClient(
|
||||
int inst, std::string_view id, net::ILocalStorage& localStorage,
|
||||
IConnectionList& connList, wpi::Logger& logger,
|
||||
|
||||
@@ -25,8 +25,6 @@
|
||||
#include "net/ClientMessageQueue.h"
|
||||
#include "net/Message.h"
|
||||
#include "net/WebSocketConnection.h"
|
||||
#include "net3/ClientImpl3.h"
|
||||
#include "net3/UvStreamConnection3.h"
|
||||
|
||||
namespace wpi {
|
||||
class Logger;
|
||||
@@ -98,28 +96,6 @@ class NetworkClientBase : public INetworkClient {
|
||||
wpi::uv::Loop& m_loop;
|
||||
};
|
||||
|
||||
class NetworkClient3 final : public NetworkClientBase {
|
||||
public:
|
||||
NetworkClient3(int inst, std::string_view id,
|
||||
net::ILocalStorage& localStorage, IConnectionList& connList,
|
||||
wpi::Logger& logger);
|
||||
~NetworkClient3() final;
|
||||
|
||||
void SetServers(
|
||||
std::span<const std::pair<std::string, unsigned int>> servers) final {
|
||||
DoSetServers(servers, NT_DEFAULT_PORT3);
|
||||
}
|
||||
|
||||
private:
|
||||
void HandleLocal();
|
||||
void TcpConnected(wpi::uv::Tcp& tcp) final;
|
||||
void ForceDisconnect(std::string_view reason) override;
|
||||
void DoDisconnect(std::string_view reason) override;
|
||||
|
||||
std::shared_ptr<net3::UvStreamConnection3> m_wire;
|
||||
std::shared_ptr<net3::ClientImpl3> m_clientImpl;
|
||||
};
|
||||
|
||||
class NetworkClient final : public NetworkClientBase {
|
||||
public:
|
||||
NetworkClient(
|
||||
@@ -131,7 +107,7 @@ class NetworkClient final : public NetworkClientBase {
|
||||
|
||||
void SetServers(
|
||||
std::span<const std::pair<std::string, unsigned int>> servers) final {
|
||||
DoSetServers(servers, NT_DEFAULT_PORT4);
|
||||
DoSetServers(servers, NT_DEFAULT_PORT);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@@ -34,7 +34,6 @@
|
||||
#include "net/WebSocketConnection.h"
|
||||
#include "net/WireDecoder.h"
|
||||
#include "net/WireEncoder.h"
|
||||
#include "net3/UvStreamConnection3.h"
|
||||
|
||||
using namespace nt;
|
||||
namespace uv = wpi::uv;
|
||||
@@ -72,16 +71,6 @@ class NetworkServer::ServerConnection {
|
||||
std::shared_ptr<uv::Timer> m_outgoingTimer;
|
||||
};
|
||||
|
||||
class NetworkServer::ServerConnection3 : public ServerConnection {
|
||||
public:
|
||||
ServerConnection3(std::shared_ptr<uv::Stream> stream, NetworkServer& server,
|
||||
std::string_view addr, unsigned int port,
|
||||
wpi::Logger& logger);
|
||||
|
||||
private:
|
||||
std::shared_ptr<net3::UvStreamConnection3> m_wire;
|
||||
};
|
||||
|
||||
class NetworkServer::ServerConnection4 final
|
||||
: public ServerConnection,
|
||||
public wpi::HttpWebSocketServerConnection<ServerConnection4> {
|
||||
@@ -134,55 +123,6 @@ void NetworkServer::ServerConnection::ConnectionClosed() {
|
||||
m_outgoingTimer->Close();
|
||||
}
|
||||
|
||||
NetworkServer::ServerConnection3::ServerConnection3(
|
||||
std::shared_ptr<uv::Stream> stream, NetworkServer& server,
|
||||
std::string_view addr, unsigned int port, wpi::Logger& logger)
|
||||
: ServerConnection{server, addr, port, logger},
|
||||
m_wire{std::make_shared<net3::UvStreamConnection3>(*stream)} {
|
||||
m_info.remote_ip = addr;
|
||||
m_info.remote_port = port;
|
||||
|
||||
// TODO: set local flag appropriately
|
||||
m_clientId = m_server.m_serverImpl.AddClient3(
|
||||
m_connInfo, false, *m_wire,
|
||||
[this](std::string_view name, uint16_t proto) {
|
||||
m_info.remote_id = name;
|
||||
m_info.protocol_version = proto;
|
||||
m_server.AddConnection(this, m_info);
|
||||
INFO("CONNECTED NT3 client '{}' (from {})", name, m_connInfo);
|
||||
},
|
||||
[this](uint32_t repeatMs) { UpdateOutgoingTimer(repeatMs); });
|
||||
|
||||
stream->error.connect([this](uv::Error err) {
|
||||
if (!m_wire->GetDisconnectReason().empty()) {
|
||||
return;
|
||||
}
|
||||
m_wire->Disconnect(fmt::format("stream error: {}", err.name()));
|
||||
m_wire->GetStream().Shutdown([this] { m_wire->GetStream().Close(); });
|
||||
});
|
||||
stream->end.connect([this] {
|
||||
if (!m_wire->GetDisconnectReason().empty()) {
|
||||
return;
|
||||
}
|
||||
m_wire->Disconnect("remote end closed connection");
|
||||
m_wire->GetStream().Shutdown([this] { m_wire->GetStream().Close(); });
|
||||
});
|
||||
stream->closed.connect([this] {
|
||||
INFO("DISCONNECTED NT3 client '{}' (from {}): {}", m_info.remote_id,
|
||||
m_connInfo, m_wire->GetDisconnectReason());
|
||||
ConnectionClosed();
|
||||
});
|
||||
stream->data.connect([this](uv::Buffer& buf, size_t size) {
|
||||
if (m_server.m_serverImpl.ProcessIncomingBinary(
|
||||
m_clientId, {reinterpret_cast<const uint8_t*>(buf.base), size})) {
|
||||
m_server.m_idle->Start();
|
||||
}
|
||||
});
|
||||
stream->StartRead();
|
||||
|
||||
SetupOutgoingTimer();
|
||||
}
|
||||
|
||||
void NetworkServer::ServerConnection4::ProcessRequest() {
|
||||
DEBUG1("HTTP request: '{}'", m_request.GetUrl());
|
||||
wpi::UrlParser url{m_request.GetUrl(),
|
||||
@@ -311,8 +251,7 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
|
||||
}
|
||||
|
||||
NetworkServer::NetworkServer(std::string_view persistentFilename,
|
||||
std::string_view listenAddress, unsigned int port3,
|
||||
unsigned int port4,
|
||||
std::string_view listenAddress, unsigned int port,
|
||||
net::ILocalStorage& localStorage,
|
||||
IConnectionList& connList, wpi::Logger& logger,
|
||||
std::function<void()> initDone)
|
||||
@@ -322,8 +261,7 @@ NetworkServer::NetworkServer(std::string_view persistentFilename,
|
||||
m_initDone{std::move(initDone)},
|
||||
m_persistentFilename{persistentFilename},
|
||||
m_listenAddress{wpi::trim(listenAddress)},
|
||||
m_port3{port3},
|
||||
m_port4{port4},
|
||||
m_port{port},
|
||||
m_serverImpl{logger},
|
||||
m_localQueue{logger},
|
||||
m_loop(*m_loopRunner.GetLoop()) {
|
||||
@@ -485,46 +423,14 @@ void NetworkServer::Init() {
|
||||
});
|
||||
}
|
||||
|
||||
INFO("Listening on NT3 port {}, NT4 port {}", m_port3, m_port4);
|
||||
INFO("Listening on port {}", m_port);
|
||||
|
||||
if (m_port3 != 0) {
|
||||
auto tcp3 = uv::Tcp::Create(m_loop);
|
||||
tcp3->error.connect([logger = &m_logger](uv::Error err) {
|
||||
WPI_INFO(*logger, "NT3 server socket error: {}", err.str());
|
||||
});
|
||||
tcp3->Bind(m_listenAddress, m_port3);
|
||||
|
||||
// when we get a NT3 connection, accept it and start reading
|
||||
tcp3->connection.connect([this, srv = tcp3.get()] {
|
||||
auto tcp = srv->Accept();
|
||||
if (!tcp) {
|
||||
return;
|
||||
}
|
||||
tcp->error.connect([logger = &m_logger](uv::Error err) {
|
||||
WPI_INFO(*logger, "NT3 socket error: {}", err.str());
|
||||
});
|
||||
tcp->SetNoDelay(true);
|
||||
std::string peerAddr;
|
||||
unsigned int peerPort = 0;
|
||||
if (uv::AddrToName(tcp->GetPeer(), &peerAddr, &peerPort) == 0) {
|
||||
INFO("Got a NT3 connection from {} port {}", peerAddr, peerPort);
|
||||
} else {
|
||||
INFO("Got a NT3 connection from unknown");
|
||||
}
|
||||
auto conn = std::make_shared<ServerConnection3>(tcp, *this, peerAddr,
|
||||
peerPort, m_logger);
|
||||
tcp->SetData(conn);
|
||||
});
|
||||
|
||||
tcp3->Listen();
|
||||
}
|
||||
|
||||
if (m_port4 != 0) {
|
||||
if (m_port != 0) {
|
||||
auto tcp4 = uv::Tcp::Create(m_loop);
|
||||
tcp4->error.connect([logger = &m_logger](uv::Error err) {
|
||||
WPI_INFO(*logger, "NT4 server socket error: {}", err.str());
|
||||
});
|
||||
tcp4->Bind(m_listenAddress, m_port4);
|
||||
tcp4->Bind(m_listenAddress, m_port);
|
||||
|
||||
// when we get a NT4 connection, accept it and start reading
|
||||
tcp4->connection.connect([this, srv = tcp4.get()] {
|
||||
|
||||
@@ -36,10 +36,9 @@ class IConnectionList;
|
||||
class NetworkServer {
|
||||
public:
|
||||
NetworkServer(std::string_view persistentFilename,
|
||||
std::string_view listenAddress, unsigned int port3,
|
||||
unsigned int port4, net::ILocalStorage& localStorage,
|
||||
IConnectionList& connList, wpi::Logger& logger,
|
||||
std::function<void()> initDone);
|
||||
std::string_view listenAddress, unsigned int port,
|
||||
net::ILocalStorage& localStorage, IConnectionList& connList,
|
||||
wpi::Logger& logger, std::function<void()> initDone);
|
||||
~NetworkServer();
|
||||
|
||||
void FlushLocal();
|
||||
@@ -47,7 +46,6 @@ class NetworkServer {
|
||||
|
||||
private:
|
||||
class ServerConnection;
|
||||
class ServerConnection3;
|
||||
class ServerConnection4;
|
||||
|
||||
void ProcessAllLocal();
|
||||
@@ -64,8 +62,7 @@ class NetworkServer {
|
||||
std::string m_persistentData;
|
||||
std::string m_persistentFilename;
|
||||
std::string m_listenAddress;
|
||||
unsigned int m_port3;
|
||||
unsigned int m_port4;
|
||||
unsigned int m_port;
|
||||
|
||||
// used only from loop
|
||||
std::shared_ptr<wpi::uv::Timer> m_readLocalTimer;
|
||||
|
||||
@@ -1173,12 +1173,12 @@ Java_edu_wpi_first_networktables_NetworkTablesJNI_stopLocal
|
||||
/*
|
||||
* Class: edu_wpi_first_networktables_NetworkTablesJNI
|
||||
* Method: startServer
|
||||
* Signature: (ILjava/lang/String;Ljava/lang/String;II)V
|
||||
* Signature: (ILjava/lang/String;Ljava/lang/String;I)V
|
||||
*/
|
||||
JNIEXPORT void JNICALL
|
||||
Java_edu_wpi_first_networktables_NetworkTablesJNI_startServer
|
||||
(JNIEnv* env, jclass, jint inst, jstring persistFilename,
|
||||
jstring listenAddress, jint port3, jint port4)
|
||||
jstring listenAddress, jint port)
|
||||
{
|
||||
if (!persistFilename) {
|
||||
nullPointerEx.Throw(env, "persistFilename cannot be null");
|
||||
@@ -1189,7 +1189,7 @@ Java_edu_wpi_first_networktables_NetworkTablesJNI_startServer
|
||||
return;
|
||||
}
|
||||
nt::StartServer(inst, JStringRef{env, persistFilename}.str(),
|
||||
JStringRef{env, listenAddress}.c_str(), port3, port4);
|
||||
JStringRef{env, listenAddress}.c_str(), port);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1206,34 +1206,18 @@ Java_edu_wpi_first_networktables_NetworkTablesJNI_stopServer
|
||||
|
||||
/*
|
||||
* Class: edu_wpi_first_networktables_NetworkTablesJNI
|
||||
* Method: startClient3
|
||||
* Method: startClient
|
||||
* Signature: (ILjava/lang/String;)V
|
||||
*/
|
||||
JNIEXPORT void JNICALL
|
||||
Java_edu_wpi_first_networktables_NetworkTablesJNI_startClient3
|
||||
Java_edu_wpi_first_networktables_NetworkTablesJNI_startClient
|
||||
(JNIEnv* env, jclass, jint inst, jstring identity)
|
||||
{
|
||||
if (!identity) {
|
||||
nullPointerEx.Throw(env, "identity cannot be null");
|
||||
return;
|
||||
}
|
||||
nt::StartClient3(inst, JStringRef{env, identity}.str());
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: edu_wpi_first_networktables_NetworkTablesJNI
|
||||
* Method: startClient4
|
||||
* Signature: (ILjava/lang/String;)V
|
||||
*/
|
||||
JNIEXPORT void JNICALL
|
||||
Java_edu_wpi_first_networktables_NetworkTablesJNI_startClient4
|
||||
(JNIEnv* env, jclass, jint inst, jstring identity)
|
||||
{
|
||||
if (!identity) {
|
||||
nullPointerEx.Throw(env, "identity cannot be null");
|
||||
return;
|
||||
}
|
||||
nt::StartClient4(inst, JStringRef{env, identity}.str());
|
||||
nt::StartClient(inst, JStringRef{env, identity}.str());
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -1,468 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#include "ClientImpl3.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <numeric>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <wpi/DenseMap.h>
|
||||
#include <wpi/StringMap.h>
|
||||
#include <wpi/timestamp.h>
|
||||
|
||||
#include "Log.h"
|
||||
#include "Types_internal.h"
|
||||
#include "net/Message.h"
|
||||
#include "net/NetworkInterface.h"
|
||||
#include "net3/WireEncoder3.h"
|
||||
#include "networktables/NetworkTableValue.h"
|
||||
|
||||
using namespace nt;
|
||||
using namespace nt::net3;
|
||||
|
||||
static constexpr uint32_t kMinPeriodMs = 5;
|
||||
|
||||
// maximum amount of time the wire can be not ready to send another
|
||||
// transmission before we close the connection
|
||||
static constexpr uint32_t kWireMaxNotReadyUs = 1000000;
|
||||
|
||||
wpi::json ClientImpl3::Entry::SetFlags(unsigned int flags_) {
|
||||
bool wasPersistent = IsPersistent();
|
||||
flags = flags_;
|
||||
bool isPersistent = IsPersistent();
|
||||
if (isPersistent && !wasPersistent) {
|
||||
properties["persistent"] = true;
|
||||
return {{"persistent", true}};
|
||||
} else if (!isPersistent && wasPersistent) {
|
||||
properties.erase("persistent");
|
||||
return {{"persistent", wpi::json()}};
|
||||
} else {
|
||||
return wpi::json::object();
|
||||
}
|
||||
}
|
||||
|
||||
ClientImpl3::ClientImpl3(uint64_t curTimeMs, int inst, WireConnection3& wire,
|
||||
wpi::Logger& logger,
|
||||
std::function<void(uint32_t repeatMs)> setPeriodic)
|
||||
: m_wire{wire},
|
||||
m_logger{logger},
|
||||
m_setPeriodic{std::move(setPeriodic)},
|
||||
m_initTimeMs{curTimeMs},
|
||||
m_nextKeepAliveTimeMs{curTimeMs + kKeepAliveIntervalMs},
|
||||
m_decoder{*this} {}
|
||||
|
||||
ClientImpl3::~ClientImpl3() {
|
||||
DEBUG4("NT3 ClientImpl destroyed");
|
||||
}
|
||||
|
||||
void ClientImpl3::ProcessIncoming(std::span<const uint8_t> data) {
|
||||
DEBUG4("received {} bytes", data.size());
|
||||
if (!m_decoder.Execute(&data)) {
|
||||
m_wire.Disconnect(m_decoder.GetError());
|
||||
}
|
||||
}
|
||||
|
||||
void ClientImpl3::HandleLocal(std::span<const net::ClientMessage> msgs) {
|
||||
for (const auto& elem : msgs) { // NOLINT
|
||||
// common case is value
|
||||
if (auto msg = std::get_if<net::ClientValueMsg>(&elem.contents)) {
|
||||
SetValue(msg->pubuid, msg->value);
|
||||
} else if (auto msg = std::get_if<net::PublishMsg>(&elem.contents)) {
|
||||
Publish(msg->pubuid, msg->name, msg->typeStr, msg->properties,
|
||||
msg->options);
|
||||
} else if (auto msg = std::get_if<net::UnpublishMsg>(&elem.contents)) {
|
||||
Unpublish(msg->pubuid);
|
||||
} else if (auto msg = std::get_if<net::SetPropertiesMsg>(&elem.contents)) {
|
||||
SetProperties(msg->name, msg->update);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClientImpl3::DoSendPeriodic(uint64_t curTimeMs, bool initial, bool flush) {
|
||||
DEBUG4("SendPeriodic({})", curTimeMs);
|
||||
|
||||
// rate limit sends
|
||||
if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto out = m_wire.Send();
|
||||
|
||||
// send keep-alive
|
||||
if (curTimeMs >= m_nextKeepAliveTimeMs) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return;
|
||||
}
|
||||
DEBUG4("Sending keep alive");
|
||||
WireEncodeKeepAlive(out.stream());
|
||||
// drift isn't critical here, so just go from current time
|
||||
m_nextKeepAliveTimeMs = curTimeMs + kKeepAliveIntervalMs;
|
||||
}
|
||||
|
||||
// send any stored-up flags updates
|
||||
if (!m_outgoingFlags.empty()) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return;
|
||||
}
|
||||
for (auto&& p : m_outgoingFlags) {
|
||||
WireEncodeFlagsUpdate(out.stream(), p.first, p.second);
|
||||
}
|
||||
m_outgoingFlags.resize(0);
|
||||
}
|
||||
|
||||
// send any pending updates due to be sent
|
||||
bool checkedNetwork = false;
|
||||
for (auto&& pub : m_publishers) {
|
||||
if (pub && !pub->outValues.empty() &&
|
||||
(flush || curTimeMs >= pub->nextSendMs)) {
|
||||
if (!checkedNetwork) {
|
||||
if (!CheckNetworkReady(curTimeMs)) {
|
||||
return;
|
||||
}
|
||||
checkedNetwork = true;
|
||||
}
|
||||
for (auto&& val : pub->outValues) {
|
||||
SendValue(out, pub->entry, val);
|
||||
}
|
||||
pub->outValues.resize(0);
|
||||
pub->nextSendMs = curTimeMs + pub->periodMs;
|
||||
}
|
||||
}
|
||||
|
||||
if (initial) {
|
||||
DEBUG4("Sending ClientHelloDone");
|
||||
WireEncodeClientHelloDone(out.stream());
|
||||
}
|
||||
|
||||
m_wire.Flush();
|
||||
m_lastSendMs = curTimeMs;
|
||||
}
|
||||
|
||||
void ClientImpl3::SendValue(Writer& out, Entry* entry, const Value& value) {
|
||||
DEBUG4("sending value for '{}', seqnum {}", entry->name,
|
||||
entry->seqNum.value());
|
||||
|
||||
// bump sequence number
|
||||
++entry->seqNum;
|
||||
|
||||
// only send assigns during initial handshake
|
||||
if (entry->id == 0xffff || m_state == kStateInitialAssignments) {
|
||||
// send assign
|
||||
WireEncodeEntryAssign(out.stream(), entry->name, entry->id,
|
||||
entry->seqNum.value(), value, entry->flags);
|
||||
} else {
|
||||
// send update
|
||||
WireEncodeEntryUpdate(out.stream(), entry->id, entry->seqNum.value(),
|
||||
value);
|
||||
}
|
||||
}
|
||||
|
||||
bool ClientImpl3::CheckNetworkReady(uint64_t curTimeMs) {
|
||||
if (!m_wire.Ready()) {
|
||||
uint64_t lastFlushTime = m_wire.GetLastFlushTime();
|
||||
uint64_t now = wpi::Now();
|
||||
if (lastFlushTime != 0 && now > (lastFlushTime + kWireMaxNotReadyUs)) {
|
||||
m_wire.Disconnect("transmit stalled");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void ClientImpl3::Publish(int pubuid, std::string_view name,
|
||||
std::string_view typeStr, const wpi::json& properties,
|
||||
const PubSubOptionsImpl& options) {
|
||||
DEBUG4("Publish('{}', '{}')", name, typeStr);
|
||||
if (static_cast<unsigned int>(pubuid) >= m_publishers.size()) {
|
||||
m_publishers.resize(pubuid + 1);
|
||||
}
|
||||
auto& publisher = m_publishers[pubuid];
|
||||
if (!publisher) {
|
||||
publisher = std::make_unique<PublisherData>(GetOrNewEntry(name));
|
||||
publisher->entry->typeStr = typeStr;
|
||||
publisher->entry->type = StringToType3(typeStr);
|
||||
publisher->entry->publishers.emplace_back(publisher.get());
|
||||
}
|
||||
publisher->options = options;
|
||||
publisher->periodMs = std::lround(options.periodicMs / 10.0) * 10;
|
||||
if (publisher->periodMs < 10) {
|
||||
publisher->periodMs = 10;
|
||||
}
|
||||
|
||||
// update period
|
||||
m_periodMs = std::gcd(m_periodMs, publisher->periodMs);
|
||||
m_setPeriodic(m_periodMs);
|
||||
}
|
||||
|
||||
void ClientImpl3::Unpublish(int pubuid) {
|
||||
DEBUG4("Unpublish({})", pubuid);
|
||||
if (static_cast<unsigned int>(pubuid) >= m_publishers.size()) {
|
||||
return;
|
||||
}
|
||||
auto& publisher = m_publishers[pubuid];
|
||||
publisher->entry->publishers.erase(
|
||||
std::remove(publisher->entry->publishers.begin(),
|
||||
publisher->entry->publishers.end(), publisher.get()),
|
||||
publisher->entry->publishers.end());
|
||||
publisher.reset();
|
||||
|
||||
// loop over all publishers to update period
|
||||
m_periodMs = kKeepAliveIntervalMs + 10;
|
||||
for (auto&& pub : m_publishers) {
|
||||
if (pub) {
|
||||
m_periodMs = std::gcd(m_periodMs, pub->periodMs);
|
||||
}
|
||||
}
|
||||
m_setPeriodic(m_periodMs);
|
||||
}
|
||||
|
||||
void ClientImpl3::SetProperties(std::string_view name,
|
||||
const wpi::json& update) {
|
||||
DEBUG4("SetProperties({}, {})", name, update.dump());
|
||||
auto entry = GetOrNewEntry(name);
|
||||
bool updated = false;
|
||||
for (auto&& elem : update.items()) {
|
||||
entry->properties[elem.key()] = elem.value();
|
||||
if (elem.key() == "persistent") {
|
||||
if (auto val = elem.value().get_ptr<const bool*>()) {
|
||||
if (*val) {
|
||||
entry->flags |= NT_PERSISTENT;
|
||||
} else {
|
||||
entry->flags &= ~NT_PERSISTENT;
|
||||
}
|
||||
updated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (updated && entry->id == 0xffff) {
|
||||
m_outgoingFlags.emplace_back(entry->id, entry->flags);
|
||||
}
|
||||
}
|
||||
|
||||
void ClientImpl3::SetValue(int pubuid, const Value& value) {
|
||||
DEBUG4("SetValue({})", pubuid);
|
||||
assert(static_cast<unsigned int>(pubuid) < m_publishers.size() &&
|
||||
m_publishers[pubuid]);
|
||||
auto& publisher = *m_publishers[pubuid];
|
||||
if (value == publisher.entry->value) {
|
||||
return;
|
||||
}
|
||||
publisher.entry->value = value;
|
||||
if (publisher.outValues.empty() || publisher.options.sendAll) {
|
||||
publisher.outValues.emplace_back(value);
|
||||
} else {
|
||||
publisher.outValues.back() = value;
|
||||
}
|
||||
}
|
||||
|
||||
void ClientImpl3::KeepAlive() {
|
||||
DEBUG4("KeepAlive()");
|
||||
if (m_state != kStateRunning && m_state != kStateInitialAssignments) {
|
||||
m_decoder.SetError("received unexpected KeepAlive message");
|
||||
return;
|
||||
}
|
||||
// ignore
|
||||
}
|
||||
|
||||
void ClientImpl3::ServerHelloDone() {
|
||||
DEBUG4("ServerHelloDone()");
|
||||
if (m_state != kStateInitialAssignments) {
|
||||
m_decoder.SetError("received unexpected ServerHelloDone message");
|
||||
return;
|
||||
}
|
||||
|
||||
// send initial assignments
|
||||
DoSendPeriodic(m_initTimeMs, true, true);
|
||||
|
||||
m_state = kStateRunning;
|
||||
m_setPeriodic(m_periodMs);
|
||||
}
|
||||
|
||||
void ClientImpl3::ClientHelloDone() {
|
||||
DEBUG4("ClientHelloDone()");
|
||||
m_decoder.SetError("received unexpected ClientHelloDone message");
|
||||
}
|
||||
|
||||
void ClientImpl3::ProtoUnsup(unsigned int proto_rev) {
|
||||
DEBUG4("ProtoUnsup({})", proto_rev);
|
||||
m_decoder.SetError(fmt::format("received ProtoUnsup(version={})", proto_rev));
|
||||
}
|
||||
|
||||
void ClientImpl3::ClientHello(std::string_view self_id,
|
||||
unsigned int proto_rev) {
|
||||
DEBUG4("ClientHello({}, {})", self_id, proto_rev);
|
||||
m_decoder.SetError("received unexpected ClientHello message");
|
||||
}
|
||||
|
||||
void ClientImpl3::ServerHello(unsigned int flags, std::string_view self_id) {
|
||||
DEBUG4("ServerHello({}, {})", flags, self_id);
|
||||
if (m_state != kStateHelloSent) {
|
||||
m_decoder.SetError("received unexpected ServerHello message");
|
||||
return;
|
||||
}
|
||||
m_state = kStateInitialAssignments;
|
||||
m_remoteId = self_id;
|
||||
m_handshakeSucceeded();
|
||||
m_handshakeSucceeded = nullptr; // no longer required
|
||||
}
|
||||
|
||||
void ClientImpl3::EntryAssign(std::string_view name, unsigned int id,
|
||||
unsigned int seq_num, const Value& value,
|
||||
unsigned int flags) {
|
||||
DEBUG4("EntryAssign({}, {}, {}, value, {})", name, id, seq_num, flags);
|
||||
if (m_state != kStateInitialAssignments && m_state != kStateRunning) {
|
||||
m_decoder.SetError("received unexpected EntryAssign message");
|
||||
return;
|
||||
}
|
||||
auto entry = GetOrNewEntry(name);
|
||||
bool flagsChanged = entry->flags != flags;
|
||||
bool typeChanged;
|
||||
bool valueChanged;
|
||||
|
||||
// don't update value if we locally published a "strong" value
|
||||
if (m_state == kStateInitialAssignments && entry->value &&
|
||||
entry->value.server_time() != 0) {
|
||||
typeChanged = false;
|
||||
valueChanged = false;
|
||||
} else {
|
||||
typeChanged = entry->type != value.type();
|
||||
valueChanged = entry->value != value;
|
||||
if (m_state == kStateInitialAssignments) {
|
||||
// remove outgoing during initial assignments so we don't get out of sync
|
||||
for (auto publisher : entry->publishers) {
|
||||
publisher->outValues.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
entry->id = id;
|
||||
entry->seqNum = SequenceNumber{seq_num};
|
||||
entry->SetFlags(flags);
|
||||
if (typeChanged) {
|
||||
entry->type = value.type();
|
||||
entry->typeStr = TypeToString(value.type());
|
||||
}
|
||||
if (valueChanged) {
|
||||
entry->value = value;
|
||||
}
|
||||
|
||||
// add to id map
|
||||
if (id >= m_idMap.size()) {
|
||||
m_idMap.resize(id + 1);
|
||||
}
|
||||
m_idMap[id] = entry;
|
||||
|
||||
if (m_local) {
|
||||
// XXX: need to handle type change specially? (e.g. with unannounce)
|
||||
if (!entry->topic || flagsChanged || typeChanged) {
|
||||
DEBUG4("NetworkAnnounce({}, {})", name, entry->typeStr);
|
||||
entry->topic = m_local->ServerAnnounce(name, 0, entry->typeStr,
|
||||
entry->properties, std::nullopt);
|
||||
}
|
||||
if (valueChanged) {
|
||||
m_local->ServerSetValue(entry->topic.value(), entry->value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClientImpl3::EntryUpdate(unsigned int id, unsigned int seq_num,
|
||||
const Value& value) {
|
||||
DEBUG4("EntryUpdate({}, {}, value)", id, seq_num);
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received EntryUpdate message before ServerHelloDone");
|
||||
return;
|
||||
}
|
||||
if (auto entry = LookupId(id)) {
|
||||
entry->value = value;
|
||||
if (m_local && entry->topic) {
|
||||
m_local->ServerSetValue(entry->topic.value(), entry->value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClientImpl3::FlagsUpdate(unsigned int id, unsigned int flags) {
|
||||
DEBUG4("FlagsUpdate({}, {})", id, flags);
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received FlagsUpdate message before ServerHelloDone");
|
||||
return;
|
||||
}
|
||||
if (auto entry = LookupId(id)) {
|
||||
wpi::json update = entry->SetFlags(flags);
|
||||
if (!update.empty() && m_local) {
|
||||
m_local->ServerPropertiesUpdate(entry->name, update, false);
|
||||
}
|
||||
}
|
||||
|
||||
// erase any outgoing flags updates
|
||||
m_outgoingFlags.erase(
|
||||
std::remove_if(m_outgoingFlags.begin(), m_outgoingFlags.end(),
|
||||
[&](const auto& p) { return p.first == id; }),
|
||||
m_outgoingFlags.end());
|
||||
}
|
||||
|
||||
void ClientImpl3::EntryDelete(unsigned int id) {
|
||||
DEBUG4("EntryDelete({})", id);
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received EntryDelete message before ServerHelloDone");
|
||||
return;
|
||||
}
|
||||
if (auto entry = LookupId(id)) {
|
||||
m_idMap[id] = nullptr;
|
||||
// set id to 0xffff so any future local setvalue will result in assign
|
||||
entry->id = 0xffff;
|
||||
entry->value = Value{};
|
||||
|
||||
// if we have no local publishers, unannounce
|
||||
if (entry->publishers.empty() && m_local && entry->topic) {
|
||||
m_local->ServerUnannounce(entry->name, entry->topic.value());
|
||||
}
|
||||
}
|
||||
|
||||
// erase any outgoing flags updates
|
||||
m_outgoingFlags.erase(
|
||||
std::remove_if(m_outgoingFlags.begin(), m_outgoingFlags.end(),
|
||||
[&](const auto& p) { return p.first == id; }),
|
||||
m_outgoingFlags.end());
|
||||
}
|
||||
|
||||
void ClientImpl3::ClearEntries() {
|
||||
DEBUG4("ClearEntries()");
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received ClearEntries message before ServerHelloDone");
|
||||
return;
|
||||
}
|
||||
for (auto& entry : m_idMap) {
|
||||
if (entry && entry->id != 0xffff && !entry->IsPersistent()) {
|
||||
entry->id = 0xffff;
|
||||
entry->value = Value{};
|
||||
|
||||
// if we have no local publishers, unannounce
|
||||
if (entry->publishers.empty() && m_local && entry->topic) {
|
||||
m_local->ServerUnannounce(entry->name, entry->topic.value());
|
||||
}
|
||||
|
||||
entry = nullptr; // clear id mapping
|
||||
}
|
||||
}
|
||||
|
||||
// erase all outgoing flags updates
|
||||
m_outgoingFlags.resize(0);
|
||||
}
|
||||
|
||||
void ClientImpl3::Start(std::string_view selfId,
|
||||
std::function<void()> succeeded) {
|
||||
if (m_state != kStateInitial) {
|
||||
return;
|
||||
}
|
||||
m_handshakeSucceeded = std::move(succeeded);
|
||||
auto writer = m_wire.Send();
|
||||
WireEncodeClientHello(writer.stream(), selfId, 0x0300);
|
||||
m_wire.Flush();
|
||||
m_state = kStateHelloSent;
|
||||
}
|
||||
@@ -1,177 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <span>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include <wpi/StringMap.h>
|
||||
#include <wpi/json.h>
|
||||
|
||||
#include "PubSubOptions.h"
|
||||
#include "net/MessageHandler.h"
|
||||
#include "net3/Message3.h"
|
||||
#include "net3/SequenceNumber.h"
|
||||
#include "net3/WireConnection3.h"
|
||||
#include "net3/WireDecoder3.h"
|
||||
|
||||
namespace wpi {
|
||||
class Logger;
|
||||
} // namespace wpi
|
||||
|
||||
namespace nt::net {
|
||||
struct ClientMessage;
|
||||
class LocalInterface;
|
||||
} // namespace nt::net
|
||||
|
||||
namespace nt::net3 {
|
||||
|
||||
class WireConnection3;
|
||||
|
||||
class ClientImpl3 final : private MessageHandler3 {
|
||||
public:
|
||||
explicit ClientImpl3(uint64_t curTimeMs, int inst, WireConnection3& wire,
|
||||
wpi::Logger& logger,
|
||||
std::function<void(uint32_t repeatMs)> setPeriodic);
|
||||
~ClientImpl3() final;
|
||||
|
||||
void Start(std::string_view selfId, std::function<void()> succeeded);
|
||||
void ProcessIncoming(std::span<const uint8_t> data);
|
||||
void HandleLocal(std::span<const net::ClientMessage> msgs);
|
||||
|
||||
void SendPeriodic(uint64_t curTimeMs, bool flush) {
|
||||
DoSendPeriodic(curTimeMs, false, flush);
|
||||
}
|
||||
|
||||
void SetLocal(net::ServerMessageHandler* local) { m_local = local; }
|
||||
|
||||
private:
|
||||
struct Entry;
|
||||
|
||||
struct PublisherData {
|
||||
explicit PublisherData(Entry* entry) : entry{entry} {}
|
||||
|
||||
Entry* entry;
|
||||
PubSubOptionsImpl options;
|
||||
// in options as double, but copy here as integer; rounded to the nearest
|
||||
// 10 ms
|
||||
uint32_t periodMs;
|
||||
uint64_t nextSendMs{0};
|
||||
std::vector<Value> outValues; // outgoing values
|
||||
};
|
||||
|
||||
// data for each entry
|
||||
struct Entry {
|
||||
explicit Entry(std::string_view name_) : name(name_) {}
|
||||
bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; }
|
||||
wpi::json SetFlags(unsigned int flags_);
|
||||
|
||||
std::string name;
|
||||
|
||||
std::string typeStr;
|
||||
NT_Type type{NT_UNASSIGNED};
|
||||
|
||||
wpi::json properties = wpi::json::object();
|
||||
|
||||
// The current value and flags
|
||||
Value value;
|
||||
unsigned int flags{0};
|
||||
|
||||
// Unique ID used in network messages; this is 0xffff until assigned
|
||||
// by the server.
|
||||
unsigned int id{0xffff};
|
||||
|
||||
// Sequence number for update resolution
|
||||
SequenceNumber seqNum;
|
||||
|
||||
// Local topic id
|
||||
std::optional<int> topic;
|
||||
|
||||
// Local publishers
|
||||
std::vector<PublisherData*> publishers;
|
||||
};
|
||||
|
||||
void DoSendPeriodic(uint64_t curTimeMs, bool initial, bool flush);
|
||||
void SendValue(Writer& out, Entry* entry, const Value& value);
|
||||
bool CheckNetworkReady(uint64_t curTimeMs);
|
||||
|
||||
// Outgoing handlers
|
||||
void Publish(int pubuid, std::string_view name, std::string_view typeStr,
|
||||
const wpi::json& properties, const PubSubOptionsImpl& options);
|
||||
void Unpublish(int pubuid);
|
||||
void SetProperties(std::string_view name, const wpi::json& update);
|
||||
void SetValue(int pubuid, const Value& value);
|
||||
|
||||
// MessageHandler interface
|
||||
void KeepAlive() final;
|
||||
void ServerHelloDone() final;
|
||||
void ClientHelloDone() final;
|
||||
void ClearEntries() final;
|
||||
void ProtoUnsup(unsigned int proto_rev) final;
|
||||
void ClientHello(std::string_view self_id, unsigned int proto_rev) final;
|
||||
void ServerHello(unsigned int flags, std::string_view self_id) final;
|
||||
void EntryAssign(std::string_view name, unsigned int id, unsigned int seq_num,
|
||||
const Value& value, unsigned int flags) final;
|
||||
void EntryUpdate(unsigned int id, unsigned int seq_num,
|
||||
const Value& value) final;
|
||||
void FlagsUpdate(unsigned int id, unsigned int flags) final;
|
||||
void EntryDelete(unsigned int id) final;
|
||||
void ExecuteRpc(unsigned int id, unsigned int uid,
|
||||
std::span<const uint8_t> params) final {}
|
||||
void RpcResponse(unsigned int id, unsigned int uid,
|
||||
std::span<const uint8_t> result) final {}
|
||||
|
||||
enum State {
|
||||
kStateInitial,
|
||||
kStateHelloSent,
|
||||
kStateInitialAssignments,
|
||||
kStateRunning
|
||||
};
|
||||
|
||||
WireConnection3& m_wire;
|
||||
wpi::Logger& m_logger;
|
||||
net::ServerMessageHandler* m_local{nullptr};
|
||||
std::function<void(uint32_t repeatMs)> m_setPeriodic;
|
||||
uint64_t m_initTimeMs;
|
||||
|
||||
// periodic sweep handling
|
||||
static constexpr uint32_t kKeepAliveIntervalMs = 1000;
|
||||
uint32_t m_periodMs{kKeepAliveIntervalMs + 10};
|
||||
uint64_t m_lastSendMs{0};
|
||||
uint64_t m_nextKeepAliveTimeMs;
|
||||
|
||||
// indexed by publisher index
|
||||
std::vector<std::unique_ptr<PublisherData>> m_publishers;
|
||||
|
||||
State m_state{kStateInitial};
|
||||
WireDecoder3 m_decoder;
|
||||
std::string m_remoteId;
|
||||
std::function<void()> m_handshakeSucceeded;
|
||||
|
||||
std::vector<std::pair<unsigned int, unsigned int>> m_outgoingFlags;
|
||||
|
||||
using NameMap = wpi::StringMap<Entry>;
|
||||
using IdMap = std::vector<Entry*>;
|
||||
|
||||
NameMap m_nameMap;
|
||||
IdMap m_idMap;
|
||||
|
||||
Entry* GetOrNewEntry(std::string_view name) {
|
||||
return &m_nameMap.try_emplace(name, name).first->second;
|
||||
}
|
||||
Entry* LookupId(unsigned int id) {
|
||||
return id < m_idMap.size() ? m_idMap[id] : nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace nt::net3
|
||||
@@ -1,156 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <span>
|
||||
#include <string>
|
||||
|
||||
#include "networktables/NetworkTableValue.h"
|
||||
#include "ntcore_c.h"
|
||||
|
||||
namespace nt::net3 {
|
||||
|
||||
class WireDecoder3;
|
||||
|
||||
class Message3 {
|
||||
struct private_init {};
|
||||
friend class WireDecoder3;
|
||||
|
||||
public:
|
||||
enum MsgType {
|
||||
kUnknown = -1,
|
||||
kKeepAlive = 0x00,
|
||||
kClientHello = 0x01,
|
||||
kProtoUnsup = 0x02,
|
||||
kServerHelloDone = 0x03,
|
||||
kServerHello = 0x04,
|
||||
kClientHelloDone = 0x05,
|
||||
kEntryAssign = 0x10,
|
||||
kEntryUpdate = 0x11,
|
||||
kFlagsUpdate = 0x12,
|
||||
kEntryDelete = 0x13,
|
||||
kClearEntries = 0x14,
|
||||
kExecuteRpc = 0x20,
|
||||
kRpcResponse = 0x21
|
||||
};
|
||||
enum DataType {
|
||||
kBoolean = 0x00,
|
||||
kDouble = 0x01,
|
||||
kString = 0x02,
|
||||
kRaw = 0x03,
|
||||
kBooleanArray = 0x10,
|
||||
kDoubleArray = 0x11,
|
||||
kStringArray = 0x12,
|
||||
kRpcDef = 0x20
|
||||
};
|
||||
static constexpr uint32_t kClearAllMagic = 0xD06CB27Aul;
|
||||
|
||||
Message3() = default;
|
||||
Message3(MsgType type, const private_init&) : m_type(type) {}
|
||||
|
||||
MsgType type() const { return m_type; }
|
||||
bool Is(MsgType type) const { return type == m_type; }
|
||||
|
||||
// Message data accessors. Callers are responsible for knowing what data is
|
||||
// actually provided for a particular message.
|
||||
std::string_view str() const { return m_str; }
|
||||
std::span<const uint8_t> bytes() const {
|
||||
return {reinterpret_cast<const uint8_t*>(m_str.data()), m_str.size()};
|
||||
}
|
||||
const Value& value() const { return m_value; }
|
||||
unsigned int id() const { return m_id; }
|
||||
unsigned int flags() const { return m_flags; }
|
||||
unsigned int seq_num_uid() const { return m_seq_num_uid; }
|
||||
|
||||
void SetValue(const Value& value) { m_value = value; }
|
||||
|
||||
// Create messages without data
|
||||
static Message3 KeepAlive() { return {kKeepAlive, {}}; }
|
||||
static Message3 ServerHelloDone() { return {kServerHelloDone, {}}; }
|
||||
static Message3 ClientHelloDone() { return {kClientHelloDone, {}}; }
|
||||
static Message3 ClearEntries() { return {kClearEntries, {}}; }
|
||||
|
||||
// Create messages with data
|
||||
static Message3 ProtoUnsup(unsigned int proto_rev = 0x0300u) {
|
||||
Message3 msg{kProtoUnsup, {}};
|
||||
msg.m_id = proto_rev;
|
||||
return msg;
|
||||
}
|
||||
static Message3 ClientHello(std::string_view self_id,
|
||||
unsigned int proto_rev = 0x0300u) {
|
||||
Message3 msg{kClientHello, {}};
|
||||
msg.m_str = self_id;
|
||||
msg.m_id = proto_rev;
|
||||
return msg;
|
||||
}
|
||||
static Message3 ServerHello(unsigned int flags, std::string_view self_id) {
|
||||
Message3 msg{kServerHello, {}};
|
||||
msg.m_str = self_id;
|
||||
msg.m_flags = flags;
|
||||
return msg;
|
||||
}
|
||||
static Message3 EntryAssign(std::string_view name, unsigned int id,
|
||||
unsigned int seq_num, const Value& value,
|
||||
unsigned int flags) {
|
||||
Message3 msg{kEntryAssign, {}};
|
||||
msg.m_str = name;
|
||||
msg.m_value = value;
|
||||
msg.m_id = id;
|
||||
msg.m_flags = flags;
|
||||
msg.m_seq_num_uid = seq_num;
|
||||
return msg;
|
||||
}
|
||||
static Message3 EntryUpdate(unsigned int id, unsigned int seq_num,
|
||||
const Value& value) {
|
||||
Message3 msg{kEntryUpdate, {}};
|
||||
msg.m_value = value;
|
||||
msg.m_id = id;
|
||||
msg.m_seq_num_uid = seq_num;
|
||||
return msg;
|
||||
}
|
||||
static Message3 FlagsUpdate(unsigned int id, unsigned int flags) {
|
||||
Message3 msg{kFlagsUpdate, {}};
|
||||
msg.m_id = id;
|
||||
msg.m_flags = flags;
|
||||
return msg;
|
||||
}
|
||||
static Message3 EntryDelete(unsigned int id) {
|
||||
Message3 msg{kEntryDelete, {}};
|
||||
msg.m_id = id;
|
||||
return msg;
|
||||
}
|
||||
static Message3 ExecuteRpc(unsigned int id, unsigned int uid,
|
||||
std::span<const uint8_t> params) {
|
||||
Message3 msg{kExecuteRpc, {}};
|
||||
msg.m_str.assign(reinterpret_cast<const char*>(params.data()),
|
||||
params.size());
|
||||
msg.m_id = id;
|
||||
msg.m_seq_num_uid = uid;
|
||||
return msg;
|
||||
}
|
||||
static Message3 RpcResponse(unsigned int id, unsigned int uid,
|
||||
std::span<const uint8_t> result) {
|
||||
Message3 msg{kRpcResponse, {}};
|
||||
msg.m_str.assign(reinterpret_cast<const char*>(result.data()),
|
||||
result.size());
|
||||
msg.m_id = id;
|
||||
msg.m_seq_num_uid = uid;
|
||||
return msg;
|
||||
}
|
||||
|
||||
private:
|
||||
MsgType m_type{kUnknown};
|
||||
|
||||
// Message data. Use varies by message type.
|
||||
std::string m_str;
|
||||
Value m_value;
|
||||
unsigned int m_id{0}; // also used for proto_rev
|
||||
unsigned int m_flags{0};
|
||||
unsigned int m_seq_num_uid{0};
|
||||
};
|
||||
|
||||
} // namespace nt::net3
|
||||
@@ -1,38 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <compare>
|
||||
|
||||
namespace nt::net3 {
|
||||
|
||||
/* A sequence number per RFC 1982 */
|
||||
class SequenceNumber {
|
||||
public:
|
||||
SequenceNumber() = default;
|
||||
explicit SequenceNumber(unsigned int value) : m_value(value) {}
|
||||
unsigned int value() const { return m_value; }
|
||||
|
||||
SequenceNumber& operator++() {
|
||||
++m_value;
|
||||
if (m_value > 0xffff) {
|
||||
m_value = 0;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
SequenceNumber operator++(int) {
|
||||
SequenceNumber tmp(*this);
|
||||
operator++();
|
||||
return tmp;
|
||||
}
|
||||
|
||||
friend auto operator<=>(const SequenceNumber& lhs,
|
||||
const SequenceNumber& rhs) = default;
|
||||
|
||||
private:
|
||||
unsigned int m_value{0};
|
||||
};
|
||||
|
||||
} // namespace nt::net3
|
||||
@@ -1,66 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#include "UvStreamConnection3.h"
|
||||
|
||||
#include <wpi/timestamp.h>
|
||||
#include <wpinet/uv/Stream.h>
|
||||
|
||||
using namespace nt;
|
||||
using namespace nt::net3;
|
||||
|
||||
static constexpr size_t kMaxPoolSize = 16;
|
||||
|
||||
UvStreamConnection3::UvStreamConnection3(wpi::uv::Stream& stream)
|
||||
: m_stream{stream}, m_os{m_buffers, [this] { return AllocBuf(); }} {}
|
||||
|
||||
UvStreamConnection3::~UvStreamConnection3() {
|
||||
for (auto&& buf : m_buf_pool) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
}
|
||||
|
||||
void UvStreamConnection3::Flush() {
|
||||
if (m_buffers.empty()) {
|
||||
return;
|
||||
}
|
||||
++m_sendsActive;
|
||||
m_stream.Write(m_buffers, [selfweak = weak_from_this()](auto bufs, auto) {
|
||||
if (auto self = selfweak.lock()) {
|
||||
#ifdef __SANITIZE_ADDRESS__
|
||||
size_t numToPool = 0;
|
||||
#else
|
||||
size_t numToPool =
|
||||
(std::min)(bufs.size(), kMaxPoolSize - self->m_buf_pool.size());
|
||||
self->m_buf_pool.insert(self->m_buf_pool.end(), bufs.begin(),
|
||||
bufs.begin() + numToPool);
|
||||
#endif
|
||||
for (auto&& buf : bufs.subspan(numToPool)) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
if (self->m_sendsActive > 0) {
|
||||
--self->m_sendsActive;
|
||||
}
|
||||
}
|
||||
});
|
||||
m_buffers.clear();
|
||||
m_os.reset();
|
||||
m_lastFlushTime = wpi::Now();
|
||||
}
|
||||
|
||||
void UvStreamConnection3::Disconnect(std::string_view reason) {
|
||||
m_reason = reason;
|
||||
m_stream.Close();
|
||||
}
|
||||
|
||||
void UvStreamConnection3::FinishSend() {}
|
||||
|
||||
wpi::uv::Buffer UvStreamConnection3::AllocBuf() {
|
||||
if (!m_buf_pool.empty()) {
|
||||
auto buf = m_buf_pool.back();
|
||||
m_buf_pool.pop_back();
|
||||
return buf;
|
||||
}
|
||||
return wpi::uv::Buffer::Allocate(kAllocSize);
|
||||
}
|
||||
@@ -1,78 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
|
||||
#include <wpi/SmallVector.h>
|
||||
#include <wpinet/raw_uv_ostream.h>
|
||||
#include <wpinet/uv/Buffer.h>
|
||||
#include <wpinet/uv/Stream.h>
|
||||
|
||||
#include "net3/WireConnection3.h"
|
||||
|
||||
namespace wpi::uv {
|
||||
class Stream;
|
||||
} // namespace wpi::uv
|
||||
|
||||
namespace nt::net3 {
|
||||
|
||||
class UvStreamConnection3 final
|
||||
: public WireConnection3,
|
||||
public std::enable_shared_from_this<UvStreamConnection3> {
|
||||
static constexpr size_t kAllocSize = 4096;
|
||||
|
||||
public:
|
||||
explicit UvStreamConnection3(wpi::uv::Stream& stream);
|
||||
~UvStreamConnection3() override;
|
||||
UvStreamConnection3(const UvStreamConnection3&) = delete;
|
||||
UvStreamConnection3& operator=(const UvStreamConnection3&) = delete;
|
||||
|
||||
bool Ready() const final { return m_sendsActive == 0; }
|
||||
|
||||
Writer Send() final { return {m_os, *this}; }
|
||||
|
||||
void Flush() final;
|
||||
|
||||
uint64_t GetLastFlushTime() const final { return m_lastFlushTime; }
|
||||
|
||||
void StopRead() final {
|
||||
if (m_readActive) {
|
||||
m_stream.StopRead();
|
||||
m_readActive = false;
|
||||
}
|
||||
}
|
||||
void StartRead() final {
|
||||
if (!m_readActive) {
|
||||
m_stream.StartRead();
|
||||
m_readActive = true;
|
||||
}
|
||||
}
|
||||
|
||||
void Disconnect(std::string_view reason) final;
|
||||
|
||||
std::string_view GetDisconnectReason() const { return m_reason; }
|
||||
|
||||
wpi::uv::Stream& GetStream() { return m_stream; }
|
||||
|
||||
private:
|
||||
void FinishSend() final;
|
||||
|
||||
wpi::uv::Buffer AllocBuf();
|
||||
|
||||
wpi::uv::Stream& m_stream;
|
||||
wpi::SmallVector<wpi::uv::Buffer, 4> m_buffers;
|
||||
std::vector<wpi::uv::Buffer> m_buf_pool;
|
||||
wpi::raw_uv_ostream m_os;
|
||||
std::string m_reason;
|
||||
uint64_t m_lastFlushTime = 0;
|
||||
int m_sendsActive = 0;
|
||||
bool m_readActive = true;
|
||||
};
|
||||
|
||||
} // namespace nt::net3
|
||||
@@ -1,72 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <string_view>
|
||||
|
||||
namespace wpi {
|
||||
class raw_ostream;
|
||||
} // namespace wpi
|
||||
|
||||
namespace nt::net3 {
|
||||
|
||||
class Writer;
|
||||
|
||||
class WireConnection3 {
|
||||
friend class Writer;
|
||||
|
||||
public:
|
||||
virtual ~WireConnection3() = default;
|
||||
|
||||
virtual bool Ready() const = 0;
|
||||
|
||||
virtual Writer Send() = 0;
|
||||
|
||||
virtual void Flush() = 0;
|
||||
|
||||
virtual uint64_t GetLastFlushTime() const = 0; // in microseconds
|
||||
|
||||
virtual void StopRead() = 0;
|
||||
virtual void StartRead() = 0;
|
||||
|
||||
virtual void Disconnect(std::string_view reason) = 0;
|
||||
|
||||
protected:
|
||||
virtual void FinishSend() = 0;
|
||||
};
|
||||
|
||||
class Writer {
|
||||
public:
|
||||
Writer(wpi::raw_ostream& os, WireConnection3& wire)
|
||||
: m_os{&os}, m_wire{&wire} {}
|
||||
Writer(const Writer&) = delete;
|
||||
Writer(Writer&& rhs) : m_os{rhs.m_os}, m_wire{rhs.m_wire} {
|
||||
rhs.m_os = nullptr;
|
||||
rhs.m_wire = nullptr;
|
||||
}
|
||||
~Writer() {
|
||||
if (m_wire) {
|
||||
m_wire->FinishSend();
|
||||
}
|
||||
}
|
||||
Writer& operator=(const Writer&) = delete;
|
||||
Writer& operator=(Writer&& rhs) {
|
||||
m_os = rhs.m_os;
|
||||
m_wire = rhs.m_wire;
|
||||
rhs.m_os = nullptr;
|
||||
rhs.m_wire = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
wpi::raw_ostream& stream() { return *m_os; }
|
||||
|
||||
private:
|
||||
wpi::raw_ostream* m_os;
|
||||
WireConnection3* m_wire;
|
||||
};
|
||||
|
||||
} // namespace nt::net3
|
||||
@@ -1,458 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#include "WireDecoder3.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <bit>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <wpi/SpanExtras.h>
|
||||
|
||||
#include "Message3.h"
|
||||
|
||||
using namespace nt;
|
||||
using namespace nt::net3;
|
||||
|
||||
static uint8_t Read8(std::span<const uint8_t>* in) {
|
||||
uint8_t val = in->front();
|
||||
*in = wpi::drop_front(*in);
|
||||
return val;
|
||||
}
|
||||
|
||||
std::optional<uint16_t> WireDecoder3::SimpleValueReader::Read16(
|
||||
std::span<const uint8_t>* in) {
|
||||
while (!in->empty()) {
|
||||
m_value <<= 8;
|
||||
m_value |= in->front() & 0xff;
|
||||
*in = wpi::drop_front(*in);
|
||||
if (++m_count >= 2) {
|
||||
uint16_t val = static_cast<uint16_t>(m_value);
|
||||
m_count = 0;
|
||||
m_value = 0;
|
||||
return val;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<uint32_t> WireDecoder3::SimpleValueReader::Read32(
|
||||
std::span<const uint8_t>* in) {
|
||||
while (!in->empty()) {
|
||||
m_value <<= 8;
|
||||
m_value |= in->front() & 0xff;
|
||||
*in = wpi::drop_front(*in);
|
||||
if (++m_count >= 4) {
|
||||
uint32_t val = static_cast<uint32_t>(m_value);
|
||||
m_count = 0;
|
||||
m_value = 0;
|
||||
return val;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<uint64_t> WireDecoder3::SimpleValueReader::Read64(
|
||||
std::span<const uint8_t>* in) {
|
||||
while (!in->empty()) {
|
||||
m_value <<= 8;
|
||||
m_value |= in->front() & 0xff;
|
||||
*in = wpi::drop_front(*in);
|
||||
if (++m_count >= 8) {
|
||||
uint64_t val = m_value;
|
||||
m_count = 0;
|
||||
m_value = 0;
|
||||
return val;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<double> WireDecoder3::SimpleValueReader::ReadDouble(
|
||||
std::span<const uint8_t>* in) {
|
||||
if (auto val = Read64(in)) {
|
||||
return std::bit_cast<double>(val.value());
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
void WireDecoder3::DoExecute(std::span<const uint8_t>* in) {
|
||||
while (!in->empty()) {
|
||||
switch (m_state) {
|
||||
case kStart: {
|
||||
uint8_t msgType = Read8(in);
|
||||
switch (msgType) {
|
||||
case Message3::kKeepAlive:
|
||||
m_out.KeepAlive();
|
||||
break;
|
||||
case Message3::kClientHello:
|
||||
m_state = kClientHello_1ProtoRev;
|
||||
break;
|
||||
case Message3::kProtoUnsup:
|
||||
m_state = kProtoUnsup_1ProtoRev;
|
||||
break;
|
||||
case Message3::kServerHello:
|
||||
m_state = kServerHello_1Flags;
|
||||
break;
|
||||
case Message3::kServerHelloDone:
|
||||
m_out.ServerHelloDone();
|
||||
break;
|
||||
case Message3::kClientHelloDone:
|
||||
m_out.ClientHelloDone();
|
||||
break;
|
||||
case Message3::kEntryAssign:
|
||||
m_state = kEntryAssign_1Name;
|
||||
break;
|
||||
case Message3::kEntryUpdate:
|
||||
m_state = kEntryUpdate_1Id;
|
||||
break;
|
||||
case Message3::kFlagsUpdate:
|
||||
m_state = kFlagsUpdate_1Id;
|
||||
break;
|
||||
case Message3::kEntryDelete:
|
||||
m_state = kEntryDelete_1Id;
|
||||
break;
|
||||
case Message3::kClearEntries:
|
||||
m_state = kClearEntries_1Magic;
|
||||
break;
|
||||
case Message3::kExecuteRpc:
|
||||
m_state = kExecuteRpc_1Id;
|
||||
break;
|
||||
case Message3::kRpcResponse:
|
||||
m_state = kRpcResponse_1Id;
|
||||
break;
|
||||
default:
|
||||
EmitError(fmt::format("unrecognized message type: {}",
|
||||
static_cast<uint32_t>(msgType)));
|
||||
return;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case kClientHello_1ProtoRev:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
if (val < 0x0300u) {
|
||||
m_state = kStart;
|
||||
m_out.ClientHello("", val.value());
|
||||
} else {
|
||||
m_state = kClientHello_2Id;
|
||||
m_id = val.value();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case kClientHello_2Id:
|
||||
if (auto val = ReadString(in)) {
|
||||
m_state = kStart;
|
||||
m_out.ClientHello(val.value(), m_id);
|
||||
}
|
||||
break;
|
||||
case kProtoUnsup_1ProtoRev:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kStart;
|
||||
m_out.ProtoUnsup(val.value());
|
||||
}
|
||||
break;
|
||||
case kServerHello_1Flags: {
|
||||
m_state = kServerHello_2Id;
|
||||
m_flags = Read8(in);
|
||||
break;
|
||||
}
|
||||
case kServerHello_2Id:
|
||||
if (auto val = ReadString(in)) {
|
||||
m_state = kStart;
|
||||
m_out.ServerHello(m_flags, val.value());
|
||||
}
|
||||
break;
|
||||
case kEntryAssign_1Name:
|
||||
if (auto val = ReadString(in)) {
|
||||
m_state = kEntryAssign_2Type;
|
||||
m_str = std::move(val.value());
|
||||
}
|
||||
break;
|
||||
case kEntryAssign_2Type:
|
||||
if (auto val = ReadType(in)) {
|
||||
m_state = kEntryAssign_3Id;
|
||||
m_valueReader = ValueReader{val.value()};
|
||||
}
|
||||
break;
|
||||
case kEntryAssign_3Id:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kEntryAssign_4SeqNum;
|
||||
m_id = val.value();
|
||||
}
|
||||
break;
|
||||
case kEntryAssign_4SeqNum:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kEntryAssign_5Flags;
|
||||
m_seq_num_uid = val.value();
|
||||
}
|
||||
break;
|
||||
case kEntryAssign_5Flags: {
|
||||
m_state = kEntryAssign_6Value;
|
||||
m_flags = Read8(in);
|
||||
break;
|
||||
}
|
||||
case kEntryAssign_6Value:
|
||||
if (auto val = ReadValue(in)) {
|
||||
m_state = kStart;
|
||||
m_out.EntryAssign(m_str, m_id, m_seq_num_uid, val.value(), m_flags);
|
||||
}
|
||||
break;
|
||||
case kEntryUpdate_1Id:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kEntryUpdate_2SeqNum;
|
||||
m_id = val.value();
|
||||
}
|
||||
break;
|
||||
case kEntryUpdate_2SeqNum:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kEntryUpdate_3Type;
|
||||
m_seq_num_uid = val.value();
|
||||
}
|
||||
break;
|
||||
case kEntryUpdate_3Type:
|
||||
if (auto val = ReadType(in)) {
|
||||
m_state = kEntryUpdate_4Value;
|
||||
m_valueReader = ValueReader{val.value()};
|
||||
}
|
||||
break;
|
||||
case kEntryUpdate_4Value:
|
||||
if (auto val = ReadValue(in)) {
|
||||
m_state = kStart;
|
||||
m_out.EntryUpdate(m_id, m_seq_num_uid, val.value());
|
||||
}
|
||||
break;
|
||||
case kFlagsUpdate_1Id:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kFlagsUpdate_2Flags;
|
||||
m_id = val.value();
|
||||
}
|
||||
break;
|
||||
case kFlagsUpdate_2Flags: {
|
||||
m_state = kStart;
|
||||
m_out.FlagsUpdate(m_id, Read8(in));
|
||||
break;
|
||||
}
|
||||
case kEntryDelete_1Id:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kStart;
|
||||
m_out.EntryDelete(val.value());
|
||||
}
|
||||
break;
|
||||
case kClearEntries_1Magic:
|
||||
if (auto val = m_simpleReader.Read32(in)) {
|
||||
m_state = kStart;
|
||||
if (val.value() == Message3::kClearAllMagic) {
|
||||
m_out.ClearEntries();
|
||||
} else {
|
||||
EmitError("received incorrect CLEAR_ENTRIES magic value");
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case kExecuteRpc_1Id:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kExecuteRpc_2Uid;
|
||||
m_id = val.value();
|
||||
}
|
||||
break;
|
||||
case kExecuteRpc_2Uid:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kExecuteRpc_3Params;
|
||||
m_seq_num_uid = val.value();
|
||||
}
|
||||
break;
|
||||
case kExecuteRpc_3Params:
|
||||
if (auto val = ReadRaw(in)) {
|
||||
m_state = kStart;
|
||||
m_out.ExecuteRpc(m_id, m_seq_num_uid, val.value());
|
||||
}
|
||||
break;
|
||||
case kRpcResponse_1Id:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kRpcResponse_2Uid;
|
||||
m_id = val.value();
|
||||
}
|
||||
break;
|
||||
case kRpcResponse_2Uid:
|
||||
if (auto val = m_simpleReader.Read16(in)) {
|
||||
m_state = kRpcResponse_3Result;
|
||||
m_seq_num_uid = val.value();
|
||||
}
|
||||
break;
|
||||
case kRpcResponse_3Result:
|
||||
if (auto val = ReadRaw(in)) {
|
||||
m_state = kStart;
|
||||
m_out.RpcResponse(m_id, m_seq_num_uid, val.value());
|
||||
}
|
||||
break;
|
||||
case kError:
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::string> WireDecoder3::ReadString(
|
||||
std::span<const uint8_t>* in) {
|
||||
// string length
|
||||
if (!m_stringReader.len) {
|
||||
if (auto val = m_ulebReader.ReadOne(in)) {
|
||||
m_stringReader.SetLen(val.value());
|
||||
m_stringReader.buf.clear();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
// string data; nolint to avoid clang-tidy false positive
|
||||
size_t toCopy =
|
||||
(std::min)(in->size(),
|
||||
static_cast<size_t>(m_stringReader.len.value() -
|
||||
m_stringReader.buf.size())); // NOLINT
|
||||
m_stringReader.buf.append(reinterpret_cast<const char*>(in->data()), toCopy);
|
||||
*in = wpi::drop_front(*in, toCopy);
|
||||
if (m_stringReader.buf.size() >= m_stringReader.len) {
|
||||
m_stringReader.len.reset();
|
||||
return std::move(m_stringReader.buf);
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<std::vector<uint8_t>> WireDecoder3::ReadRaw(
|
||||
std::span<const uint8_t>* in) {
|
||||
// string length
|
||||
if (!m_rawReader.len) {
|
||||
if (auto val = m_ulebReader.ReadOne(in)) {
|
||||
m_rawReader.SetLen(val.value());
|
||||
m_rawReader.buf.clear();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
// string data
|
||||
size_t toCopy = (std::min)(
|
||||
static_cast<size_t>(in->size()),
|
||||
static_cast<size_t>(m_rawReader.len.value() - m_rawReader.buf.size()));
|
||||
m_rawReader.buf.insert(m_rawReader.buf.end(), in->begin(),
|
||||
in->begin() + toCopy);
|
||||
*in = wpi::drop_front(*in, toCopy);
|
||||
if (m_rawReader.buf.size() >= m_rawReader.len) {
|
||||
m_rawReader.len.reset();
|
||||
return std::move(m_rawReader.buf);
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<NT_Type> WireDecoder3::ReadType(std::span<const uint8_t>* in) {
|
||||
// Convert from byte value to enum
|
||||
switch (Read8(in)) {
|
||||
case Message3::kBoolean:
|
||||
return NT_BOOLEAN;
|
||||
case Message3::kDouble:
|
||||
return NT_DOUBLE;
|
||||
case Message3::kString:
|
||||
return NT_STRING;
|
||||
case Message3::kRaw:
|
||||
return NT_RAW;
|
||||
case Message3::kBooleanArray:
|
||||
return NT_BOOLEAN_ARRAY;
|
||||
case Message3::kDoubleArray:
|
||||
return NT_DOUBLE_ARRAY;
|
||||
case Message3::kStringArray:
|
||||
return NT_STRING_ARRAY;
|
||||
case Message3::kRpcDef:
|
||||
return NT_RPC;
|
||||
default:
|
||||
return EmitError("unrecognized value type");
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<Value> WireDecoder3::ReadValue(std::span<const uint8_t>* in) {
|
||||
while (!in->empty()) {
|
||||
switch (m_valueReader.type) {
|
||||
case NT_BOOLEAN:
|
||||
return Value::MakeBoolean(Read8(in) != 0);
|
||||
case NT_DOUBLE:
|
||||
if (auto val = m_simpleReader.ReadDouble(in)) {
|
||||
return Value::MakeDouble(val.value());
|
||||
}
|
||||
break;
|
||||
case NT_STRING:
|
||||
if (auto val = ReadString(in)) {
|
||||
return Value::MakeString(std::move(val.value()));
|
||||
}
|
||||
break;
|
||||
case NT_RAW:
|
||||
case NT_RPC:
|
||||
if (auto val = ReadRaw(in)) {
|
||||
return Value::MakeRaw(std::move(val.value()));
|
||||
}
|
||||
break;
|
||||
#if 0
|
||||
case NT_RPC:
|
||||
if (auto val = ReadRaw(in)) {
|
||||
return Value::MakeRpc(std::move(val.value()));
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
case NT_BOOLEAN_ARRAY:
|
||||
// size
|
||||
if (!m_valueReader.haveSize) {
|
||||
m_valueReader.SetSize(Read8(in));
|
||||
break;
|
||||
}
|
||||
|
||||
// array values
|
||||
while (!in->empty() && m_valueReader.ints.size() < m_valueReader.size) {
|
||||
m_valueReader.ints.emplace_back(Read8(in) ? 1 : 0);
|
||||
}
|
||||
if (m_valueReader.ints.size() == m_valueReader.size) {
|
||||
return Value::MakeBooleanArray(std::move(m_valueReader.ints));
|
||||
}
|
||||
break;
|
||||
case NT_DOUBLE_ARRAY:
|
||||
// size
|
||||
if (!m_valueReader.haveSize) {
|
||||
m_valueReader.SetSize(Read8(in));
|
||||
break;
|
||||
}
|
||||
|
||||
// array values
|
||||
while (!in->empty() &&
|
||||
m_valueReader.doubles.size() < m_valueReader.size) {
|
||||
if (auto val = m_simpleReader.ReadDouble(in)) {
|
||||
m_valueReader.doubles.emplace_back(std::move(val.value()));
|
||||
}
|
||||
}
|
||||
if (m_valueReader.doubles.size() == m_valueReader.size) {
|
||||
return Value::MakeDoubleArray(std::move(m_valueReader.doubles));
|
||||
}
|
||||
break;
|
||||
case NT_STRING_ARRAY:
|
||||
// size
|
||||
if (!m_valueReader.haveSize) {
|
||||
m_valueReader.SetSize(Read8(in));
|
||||
break;
|
||||
}
|
||||
|
||||
// array values
|
||||
while (!in->empty() &&
|
||||
m_valueReader.strings.size() < m_valueReader.size) {
|
||||
if (auto val = ReadString(in)) {
|
||||
m_valueReader.strings.emplace_back(std::move(val.value()));
|
||||
}
|
||||
}
|
||||
if (m_valueReader.strings.size() == m_valueReader.size) {
|
||||
return Value::MakeStringArray(std::move(m_valueReader.strings));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
return EmitError("invalid type when trying to read value");
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
@@ -1,183 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <span>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <wpi/leb128.h>
|
||||
|
||||
#include "ntcore_c.h"
|
||||
|
||||
namespace nt {
|
||||
class Value;
|
||||
} // namespace nt
|
||||
|
||||
namespace nt::net3 {
|
||||
|
||||
class MessageHandler3 {
|
||||
public:
|
||||
virtual ~MessageHandler3() = default;
|
||||
|
||||
virtual void KeepAlive() = 0;
|
||||
virtual void ServerHelloDone() = 0;
|
||||
virtual void ClientHelloDone() = 0;
|
||||
virtual void ClearEntries() = 0;
|
||||
virtual void ProtoUnsup(unsigned int proto_rev) = 0;
|
||||
virtual void ClientHello(std::string_view self_id,
|
||||
unsigned int proto_rev) = 0;
|
||||
virtual void ServerHello(unsigned int flags, std::string_view self_id) = 0;
|
||||
virtual void EntryAssign(std::string_view name, unsigned int id,
|
||||
unsigned int seq_num, const Value& value,
|
||||
unsigned int flags) = 0;
|
||||
virtual void EntryUpdate(unsigned int id, unsigned int seq_num,
|
||||
const Value& value) = 0;
|
||||
virtual void FlagsUpdate(unsigned int id, unsigned int flags) = 0;
|
||||
virtual void EntryDelete(unsigned int id) = 0;
|
||||
virtual void ExecuteRpc(unsigned int id, unsigned int uid,
|
||||
std::span<const uint8_t> params) = 0;
|
||||
virtual void RpcResponse(unsigned int id, unsigned int uid,
|
||||
std::span<const uint8_t> result) = 0;
|
||||
};
|
||||
|
||||
/* Decodes NT3 protocol into native representation. */
|
||||
class WireDecoder3 {
|
||||
public:
|
||||
explicit WireDecoder3(MessageHandler3& out) : m_out{out} {}
|
||||
|
||||
/**
|
||||
* Executes the decoder. All input data will be consumed unless an error
|
||||
* occurs.
|
||||
* @param in input data (updated during parse)
|
||||
* @return false if error occurred
|
||||
*/
|
||||
bool Execute(std::span<const uint8_t>* in) {
|
||||
DoExecute(in);
|
||||
return m_state != kError;
|
||||
}
|
||||
|
||||
void SetError(std::string_view message) { EmitError(message); }
|
||||
std::string GetError() const { return m_error; }
|
||||
|
||||
private:
|
||||
class SimpleValueReader {
|
||||
public:
|
||||
std::optional<uint16_t> Read16(std::span<const uint8_t>* in);
|
||||
std::optional<uint32_t> Read32(std::span<const uint8_t>* in);
|
||||
std::optional<uint64_t> Read64(std::span<const uint8_t>* in);
|
||||
std::optional<double> ReadDouble(std::span<const uint8_t>* in);
|
||||
|
||||
private:
|
||||
uint64_t m_value = 0;
|
||||
int m_count = 0;
|
||||
};
|
||||
|
||||
struct StringReader {
|
||||
void SetLen(uint64_t len_) {
|
||||
len = len_;
|
||||
buf.clear();
|
||||
}
|
||||
|
||||
std::optional<uint64_t> len;
|
||||
std::string buf;
|
||||
};
|
||||
|
||||
struct RawReader {
|
||||
void SetLen(uint64_t len_) {
|
||||
len = len_;
|
||||
buf.clear();
|
||||
}
|
||||
|
||||
std::optional<uint64_t> len;
|
||||
std::vector<uint8_t> buf;
|
||||
};
|
||||
|
||||
struct ValueReader {
|
||||
ValueReader() = default;
|
||||
explicit ValueReader(NT_Type type_) : type{type_} {}
|
||||
|
||||
void SetSize(uint32_t size_) {
|
||||
haveSize = true;
|
||||
size = size_;
|
||||
ints.clear();
|
||||
doubles.clear();
|
||||
strings.clear();
|
||||
}
|
||||
|
||||
NT_Type type = NT_UNASSIGNED;
|
||||
bool haveSize = false;
|
||||
uint32_t size = 0;
|
||||
std::vector<int> ints;
|
||||
std::vector<double> doubles;
|
||||
std::vector<std::string> strings;
|
||||
};
|
||||
|
||||
MessageHandler3& m_out;
|
||||
|
||||
// primary (message) decode state
|
||||
enum {
|
||||
kStart,
|
||||
kClientHello_1ProtoRev,
|
||||
kClientHello_2Id,
|
||||
kProtoUnsup_1ProtoRev,
|
||||
kServerHello_1Flags,
|
||||
kServerHello_2Id,
|
||||
kEntryAssign_1Name,
|
||||
kEntryAssign_2Type,
|
||||
kEntryAssign_3Id,
|
||||
kEntryAssign_4SeqNum,
|
||||
kEntryAssign_5Flags,
|
||||
kEntryAssign_6Value,
|
||||
kEntryUpdate_1Id,
|
||||
kEntryUpdate_2SeqNum,
|
||||
kEntryUpdate_3Type,
|
||||
kEntryUpdate_4Value,
|
||||
kFlagsUpdate_1Id,
|
||||
kFlagsUpdate_2Flags,
|
||||
kEntryDelete_1Id,
|
||||
kClearEntries_1Magic,
|
||||
kExecuteRpc_1Id,
|
||||
kExecuteRpc_2Uid,
|
||||
kExecuteRpc_3Params,
|
||||
kRpcResponse_1Id,
|
||||
kRpcResponse_2Uid,
|
||||
kRpcResponse_3Result,
|
||||
kError
|
||||
} m_state = kStart;
|
||||
|
||||
// detail decoders
|
||||
wpi::Uleb128Reader m_ulebReader;
|
||||
SimpleValueReader m_simpleReader;
|
||||
StringReader m_stringReader;
|
||||
RawReader m_rawReader;
|
||||
ValueReader m_valueReader;
|
||||
|
||||
std::string m_error;
|
||||
|
||||
std::string m_str;
|
||||
unsigned int m_id{0}; // also used for proto_rev
|
||||
unsigned int m_flags{0};
|
||||
unsigned int m_seq_num_uid{0};
|
||||
|
||||
void DoExecute(std::span<const uint8_t>* in);
|
||||
|
||||
std::nullopt_t EmitError(std::string_view msg) {
|
||||
m_state = kError;
|
||||
m_error = msg;
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<std::string> ReadString(std::span<const uint8_t>* in);
|
||||
std::optional<std::vector<uint8_t>> ReadRaw(std::span<const uint8_t>* in);
|
||||
std::optional<NT_Type> ReadType(std::span<const uint8_t>* in);
|
||||
std::optional<Value> ReadValue(std::span<const uint8_t>* in);
|
||||
};
|
||||
|
||||
} // namespace nt::net3
|
||||
@@ -1,321 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#include "WireEncoder3.h"
|
||||
|
||||
#include <bit>
|
||||
|
||||
#include <wpi/Endian.h>
|
||||
#include <wpi/SmallVector.h>
|
||||
#include <wpi/leb128.h>
|
||||
#include <wpi/raw_ostream.h>
|
||||
|
||||
#include "Message3.h"
|
||||
|
||||
using namespace nt;
|
||||
using namespace nt::net3;
|
||||
|
||||
static void Write8(wpi::raw_ostream& os, uint8_t val) {
|
||||
os << val;
|
||||
}
|
||||
|
||||
static void Write16(wpi::raw_ostream& os, uint16_t val) {
|
||||
uint8_t buf[2];
|
||||
wpi::support::endian::write16be(buf, val);
|
||||
os << buf;
|
||||
}
|
||||
|
||||
static void Write32(wpi::raw_ostream& os, uint32_t val) {
|
||||
uint8_t buf[4];
|
||||
wpi::support::endian::write32be(buf, val);
|
||||
os << buf;
|
||||
}
|
||||
|
||||
static void WriteDouble(wpi::raw_ostream& os, double val) {
|
||||
uint8_t buf[8];
|
||||
wpi::support::endian::write64be(buf, std::bit_cast<uint64_t>(val));
|
||||
os << buf;
|
||||
}
|
||||
|
||||
static void WriteString(wpi::raw_ostream& os, std::string_view str) {
|
||||
wpi::WriteUleb128(os, str.size());
|
||||
os << str;
|
||||
}
|
||||
|
||||
static void WriteRaw(wpi::raw_ostream& os, std::span<const uint8_t> str) {
|
||||
wpi::WriteUleb128(os, str.size());
|
||||
os << str;
|
||||
}
|
||||
|
||||
static bool WriteType(wpi::raw_ostream& os, NT_Type type) {
|
||||
char ch;
|
||||
// Convert from enum to actual byte value.
|
||||
switch (type) {
|
||||
case NT_BOOLEAN:
|
||||
ch = Message3::kBoolean;
|
||||
break;
|
||||
case NT_INTEGER:
|
||||
case NT_FLOAT:
|
||||
case NT_DOUBLE:
|
||||
ch = Message3::kDouble;
|
||||
break;
|
||||
case NT_STRING:
|
||||
ch = Message3::kString;
|
||||
break;
|
||||
case NT_RAW:
|
||||
ch = Message3::kRaw;
|
||||
break;
|
||||
case NT_BOOLEAN_ARRAY:
|
||||
ch = Message3::kBooleanArray;
|
||||
break;
|
||||
case NT_INTEGER_ARRAY:
|
||||
case NT_FLOAT_ARRAY:
|
||||
case NT_DOUBLE_ARRAY:
|
||||
ch = Message3::kDoubleArray;
|
||||
break;
|
||||
case NT_STRING_ARRAY:
|
||||
ch = Message3::kStringArray;
|
||||
break;
|
||||
case NT_RPC:
|
||||
ch = Message3::kRpcDef;
|
||||
break;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
os << ch;
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool WriteValue(wpi::raw_ostream& os, const Value& value) {
|
||||
switch (value.type()) {
|
||||
case NT_BOOLEAN:
|
||||
Write8(os, value.GetBoolean() ? 1 : 0);
|
||||
break;
|
||||
case NT_INTEGER:
|
||||
WriteDouble(os, value.GetInteger());
|
||||
break;
|
||||
case NT_FLOAT:
|
||||
WriteDouble(os, value.GetFloat());
|
||||
break;
|
||||
case NT_DOUBLE:
|
||||
WriteDouble(os, value.GetDouble());
|
||||
break;
|
||||
case NT_STRING:
|
||||
WriteString(os, value.GetString());
|
||||
break;
|
||||
case NT_RAW:
|
||||
WriteRaw(os, value.GetRaw());
|
||||
break;
|
||||
case NT_RPC:
|
||||
WriteRaw(os, value.GetRaw());
|
||||
break;
|
||||
case NT_BOOLEAN_ARRAY: {
|
||||
auto v = value.GetBooleanArray();
|
||||
size_t size = v.size();
|
||||
if (size > 0xff) {
|
||||
size = 0xff; // size is only 1 byte, truncate
|
||||
}
|
||||
Write8(os, size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
Write8(os, v[i] ? 1 : 0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case NT_INTEGER_ARRAY: {
|
||||
auto v = value.GetIntegerArray();
|
||||
size_t size = v.size();
|
||||
if (size > 0xff) {
|
||||
size = 0xff; // size is only 1 byte, truncate
|
||||
}
|
||||
Write8(os, size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
WriteDouble(os, v[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case NT_FLOAT_ARRAY: {
|
||||
auto v = value.GetFloatArray();
|
||||
size_t size = v.size();
|
||||
if (size > 0xff) {
|
||||
size = 0xff; // size is only 1 byte, truncate
|
||||
}
|
||||
Write8(os, size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
WriteDouble(os, v[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case NT_DOUBLE_ARRAY: {
|
||||
auto v = value.GetDoubleArray();
|
||||
size_t size = v.size();
|
||||
if (size > 0xff) {
|
||||
size = 0xff; // size is only 1 byte, truncate
|
||||
}
|
||||
Write8(os, size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
WriteDouble(os, v[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case NT_STRING_ARRAY: {
|
||||
auto v = value.GetStringArray();
|
||||
size_t size = v.size();
|
||||
if (size > 0xff) {
|
||||
size = 0xff; // size is only 1 byte, truncate
|
||||
}
|
||||
Write8(os, size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
WriteString(os, v[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeKeepAlive(wpi::raw_ostream& os) {
|
||||
Write8(os, Message3::kKeepAlive);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeServerHelloDone(wpi::raw_ostream& os) {
|
||||
Write8(os, Message3::kServerHelloDone);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeClientHelloDone(wpi::raw_ostream& os) {
|
||||
Write8(os, Message3::kClientHelloDone);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeClearEntries(wpi::raw_ostream& os) {
|
||||
Write8(os, Message3::kClearEntries);
|
||||
Write32(os, Message3::kClearAllMagic);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeProtoUnsup(wpi::raw_ostream& os,
|
||||
unsigned int proto_rev) {
|
||||
Write8(os, Message3::kProtoUnsup);
|
||||
Write16(os, proto_rev);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeClientHello(wpi::raw_ostream& os,
|
||||
std::string_view self_id,
|
||||
unsigned int proto_rev) {
|
||||
Write8(os, Message3::kClientHello);
|
||||
Write16(os, proto_rev);
|
||||
WriteString(os, self_id);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeServerHello(wpi::raw_ostream& os, unsigned int flags,
|
||||
std::string_view self_id) {
|
||||
Write8(os, Message3::kServerHello);
|
||||
Write8(os, flags);
|
||||
WriteString(os, self_id);
|
||||
}
|
||||
|
||||
bool nt::net3::WireEncodeEntryAssign(wpi::raw_ostream& os,
|
||||
std::string_view name, unsigned int id,
|
||||
unsigned int seq_num, const Value& value,
|
||||
unsigned int flags) {
|
||||
Write8(os, Message3::kEntryAssign);
|
||||
WriteString(os, name);
|
||||
WriteType(os, value.type());
|
||||
Write16(os, id);
|
||||
Write16(os, seq_num);
|
||||
Write8(os, flags);
|
||||
return WriteValue(os, value);
|
||||
}
|
||||
|
||||
bool nt::net3::WireEncodeEntryUpdate(wpi::raw_ostream& os, unsigned int id,
|
||||
unsigned int seq_num, const Value& value) {
|
||||
Write8(os, Message3::kEntryUpdate);
|
||||
Write16(os, id);
|
||||
Write16(os, seq_num);
|
||||
WriteType(os, value.type());
|
||||
return WriteValue(os, value);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeFlagsUpdate(wpi::raw_ostream& os, unsigned int id,
|
||||
unsigned int flags) {
|
||||
Write8(os, Message3::kFlagsUpdate);
|
||||
Write16(os, id);
|
||||
Write8(os, flags);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeEntryDelete(wpi::raw_ostream& os, unsigned int id) {
|
||||
Write8(os, Message3::kEntryDelete);
|
||||
Write16(os, id);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeExecuteRpc(wpi::raw_ostream& os, unsigned int id,
|
||||
unsigned int uid,
|
||||
std::span<const uint8_t> params) {
|
||||
Write8(os, Message3::kExecuteRpc);
|
||||
Write16(os, id);
|
||||
Write16(os, uid);
|
||||
WriteRaw(os, params);
|
||||
}
|
||||
|
||||
void nt::net3::WireEncodeRpcResponse(wpi::raw_ostream& os, unsigned int id,
|
||||
unsigned int uid,
|
||||
std::span<const uint8_t> result) {
|
||||
Write8(os, Message3::kRpcResponse);
|
||||
Write16(os, id);
|
||||
Write16(os, uid);
|
||||
WriteRaw(os, result);
|
||||
}
|
||||
|
||||
bool nt::net3::WireEncode(wpi::raw_ostream& os, const Message3& msg) {
|
||||
switch (msg.type()) {
|
||||
case Message3::kKeepAlive:
|
||||
WireEncodeKeepAlive(os);
|
||||
break;
|
||||
case Message3::kServerHelloDone:
|
||||
WireEncodeServerHelloDone(os);
|
||||
break;
|
||||
case Message3::kClientHelloDone:
|
||||
WireEncodeClientHelloDone(os);
|
||||
break;
|
||||
case Message3::kClientHello:
|
||||
WireEncodeClientHello(os, msg.str(), msg.id());
|
||||
break;
|
||||
case Message3::kProtoUnsup:
|
||||
WireEncodeProtoUnsup(os, msg.id());
|
||||
break;
|
||||
case Message3::kServerHello:
|
||||
WireEncodeServerHello(os, msg.flags(), msg.str());
|
||||
break;
|
||||
case Message3::kEntryAssign:
|
||||
return WireEncodeEntryAssign(os, msg.str(), msg.id(), msg.seq_num_uid(),
|
||||
msg.value(), msg.flags());
|
||||
case Message3::kEntryUpdate:
|
||||
return WireEncodeEntryUpdate(os, msg.id(), msg.seq_num_uid(),
|
||||
msg.value());
|
||||
case Message3::kFlagsUpdate:
|
||||
WireEncodeFlagsUpdate(os, msg.id(), msg.flags());
|
||||
break;
|
||||
case Message3::kEntryDelete:
|
||||
WireEncodeEntryDelete(os, msg.id());
|
||||
break;
|
||||
case Message3::kClearEntries:
|
||||
WireEncodeClearEntries(os);
|
||||
break;
|
||||
case Message3::kExecuteRpc:
|
||||
WireEncodeExecuteRpc(os, msg.id(), msg.seq_num_uid(), msg.bytes());
|
||||
break;
|
||||
case Message3::kRpcResponse:
|
||||
WireEncodeRpcResponse(os, msg.id(), msg.seq_num_uid(), msg.bytes());
|
||||
break;
|
||||
case Message3::kUnknown:
|
||||
return true; // ignore
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -1,49 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <span>
|
||||
#include <string_view>
|
||||
|
||||
namespace wpi {
|
||||
class raw_ostream;
|
||||
} // namespace wpi
|
||||
|
||||
namespace nt {
|
||||
class Value;
|
||||
} // namespace nt
|
||||
|
||||
namespace nt::net3 {
|
||||
|
||||
class Message3;
|
||||
|
||||
// encoders for messages (avoids need to construct a Message struct)
|
||||
void WireEncodeKeepAlive(wpi::raw_ostream& os);
|
||||
void WireEncodeServerHelloDone(wpi::raw_ostream& os);
|
||||
void WireEncodeClientHelloDone(wpi::raw_ostream& os);
|
||||
void WireEncodeClearEntries(wpi::raw_ostream& os);
|
||||
void WireEncodeProtoUnsup(wpi::raw_ostream& os, unsigned int proto_rev);
|
||||
void WireEncodeClientHello(wpi::raw_ostream& os, std::string_view self_id,
|
||||
unsigned int proto_rev);
|
||||
void WireEncodeServerHello(wpi::raw_ostream& os, unsigned int flags,
|
||||
std::string_view self_id);
|
||||
bool WireEncodeEntryAssign(wpi::raw_ostream& os, std::string_view name,
|
||||
unsigned int id, unsigned int seq_num,
|
||||
const Value& value, unsigned int flags);
|
||||
bool WireEncodeEntryUpdate(wpi::raw_ostream& os, unsigned int id,
|
||||
unsigned int seq_num, const Value& value);
|
||||
void WireEncodeFlagsUpdate(wpi::raw_ostream& os, unsigned int id,
|
||||
unsigned int flags);
|
||||
void WireEncodeEntryDelete(wpi::raw_ostream& os, unsigned int id);
|
||||
void WireEncodeExecuteRpc(wpi::raw_ostream& os, unsigned int id,
|
||||
unsigned int uid, std::span<const uint8_t> params);
|
||||
void WireEncodeRpcResponse(wpi::raw_ostream& os, unsigned int id,
|
||||
unsigned int uid, std::span<const uint8_t> result);
|
||||
|
||||
bool WireEncode(wpi::raw_ostream& os, const Message3& msg);
|
||||
|
||||
} // namespace nt::net3
|
||||
@@ -527,22 +527,18 @@ void NT_StopLocal(NT_Inst inst) {
|
||||
}
|
||||
|
||||
void NT_StartServer(NT_Inst inst, const struct WPI_String* persist_filename,
|
||||
const struct WPI_String* listen_address, unsigned int port3,
|
||||
unsigned int port4) {
|
||||
const struct WPI_String* listen_address,
|
||||
unsigned int port) {
|
||||
nt::StartServer(inst, wpi::to_string_view(persist_filename),
|
||||
wpi::to_string_view(listen_address), port3, port4);
|
||||
wpi::to_string_view(listen_address), port);
|
||||
}
|
||||
|
||||
void NT_StopServer(NT_Inst inst) {
|
||||
nt::StopServer(inst);
|
||||
}
|
||||
|
||||
void NT_StartClient3(NT_Inst inst, const struct WPI_String* identity) {
|
||||
nt::StartClient3(inst, wpi::to_string_view(identity));
|
||||
}
|
||||
|
||||
void NT_StartClient4(NT_Inst inst, const struct WPI_String* identity) {
|
||||
nt::StartClient4(inst, wpi::to_string_view(identity));
|
||||
void NT_StartClient(NT_Inst inst, const struct WPI_String* identity) {
|
||||
nt::StartClient(inst, wpi::to_string_view(identity));
|
||||
}
|
||||
|
||||
void NT_StopClient(NT_Inst inst) {
|
||||
|
||||
@@ -633,10 +633,9 @@ void StopLocal(NT_Inst inst) {
|
||||
}
|
||||
|
||||
void StartServer(NT_Inst inst, std::string_view persist_filename,
|
||||
std::string_view listen_address, unsigned int port3,
|
||||
unsigned int port4) {
|
||||
std::string_view listen_address, unsigned int port) {
|
||||
if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) {
|
||||
ii->StartServer(persist_filename, listen_address, port3, port4);
|
||||
ii->StartServer(persist_filename, listen_address, port);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -646,15 +645,9 @@ void StopServer(NT_Inst inst) {
|
||||
}
|
||||
}
|
||||
|
||||
void StartClient3(NT_Inst inst, std::string_view identity) {
|
||||
void StartClient(NT_Inst inst, std::string_view identity) {
|
||||
if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) {
|
||||
ii->StartClient3(identity);
|
||||
}
|
||||
}
|
||||
|
||||
void StartClient4(NT_Inst inst, std::string_view identity) {
|
||||
if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) {
|
||||
ii->StartClient4(identity);
|
||||
ii->StartClient(identity);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,482 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#include "ServerClient3.h"
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include <wpi/timestamp.h>
|
||||
|
||||
#include "Log.h"
|
||||
#include "Types_internal.h"
|
||||
#include "net3/WireEncoder3.h"
|
||||
#include "server/ServerImpl.h"
|
||||
#include "server/ServerPublisher.h"
|
||||
#include "server/ServerTopic.h"
|
||||
|
||||
using namespace nt::server;
|
||||
|
||||
// maximum amount of time the wire can be not ready to send another
|
||||
// transmission before we close the connection
|
||||
static constexpr uint32_t kWireMaxNotReadyUs = 1000000;
|
||||
|
||||
bool ServerClient3::TopicData3::UpdateFlags(ServerTopic* topic) {
|
||||
unsigned int newFlags = topic->persistent ? NT_PERSISTENT : 0;
|
||||
bool updated = flags != newFlags;
|
||||
flags = newFlags;
|
||||
return updated;
|
||||
}
|
||||
|
||||
bool ServerClient3::ProcessIncomingBinary(std::span<const uint8_t> data) {
|
||||
if (!m_decoder.Execute(&data)) {
|
||||
m_wire.Disconnect(m_decoder.GetError());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void ServerClient3::SendValue(ServerTopic* topic, const Value& value,
|
||||
net::ValueSendMode mode) {
|
||||
if (m_state != kStateRunning) {
|
||||
if (mode == net::ValueSendMode::kImm) {
|
||||
mode = net::ValueSendMode::kAll;
|
||||
}
|
||||
} else if (m_local) {
|
||||
mode = net::ValueSendMode::kImm; // always send local immediately
|
||||
}
|
||||
TopicData3* topic3 = GetTopic3(topic);
|
||||
bool added = false;
|
||||
|
||||
switch (mode) {
|
||||
case net::ValueSendMode::kDisabled: // do nothing
|
||||
break;
|
||||
case net::ValueSendMode::kImm: // send immediately
|
||||
++topic3->seqNum;
|
||||
if (topic3->sentAssign) {
|
||||
net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id,
|
||||
topic3->seqNum.value(), value);
|
||||
} else {
|
||||
net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name,
|
||||
topic->id, topic3->seqNum.value(), value,
|
||||
topic3->flags);
|
||||
topic3->sentAssign = true;
|
||||
}
|
||||
if (m_local) {
|
||||
Flush();
|
||||
}
|
||||
break;
|
||||
case net::ValueSendMode::kNormal: {
|
||||
// replace, or append if not present
|
||||
wpi::DenseMap<NT_Topic, size_t>::iterator it;
|
||||
std::tie(it, added) =
|
||||
m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size());
|
||||
if (!added && it->second < m_outgoing.size()) {
|
||||
auto& msg = m_outgoing[it->second];
|
||||
if (msg.Is(net3::Message3::kEntryUpdate) ||
|
||||
msg.Is(net3::Message3::kEntryAssign)) {
|
||||
if (msg.id() == topic->id) { // should always be true
|
||||
msg.SetValue(value);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// fallthrough
|
||||
case net::ValueSendMode::kAll: // append to outgoing
|
||||
if (!added) {
|
||||
m_outgoingValueMap[topic->id] = m_outgoing.size();
|
||||
}
|
||||
++topic3->seqNum;
|
||||
if (topic3->sentAssign) {
|
||||
m_outgoing.emplace_back(net3::Message3::EntryUpdate(
|
||||
topic->id, topic3->seqNum.value(), value));
|
||||
} else {
|
||||
m_outgoing.emplace_back(net3::Message3::EntryAssign(
|
||||
topic->name, topic->id, topic3->seqNum.value(), value,
|
||||
topic3->flags));
|
||||
topic3->sentAssign = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void ServerClient3::SendAnnounce(ServerTopic* topic,
|
||||
std::optional<int> pubuid) {
|
||||
// ignore if we've not yet built the subscriber
|
||||
if (m_subscribers.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// subscribe to all non-special topics
|
||||
if (!topic->special) {
|
||||
topic->clients[this].AddSubscriber(m_subscribers[0].get());
|
||||
m_storage.UpdateMetaTopicSub(topic);
|
||||
}
|
||||
|
||||
// NT3 requires a value to send the assign message, so the assign message
|
||||
// will get sent when the first value is sent (by SendValue).
|
||||
}
|
||||
|
||||
void ServerClient3::SendUnannounce(ServerTopic* topic) {
|
||||
auto it = m_topics3.find(topic);
|
||||
if (it == m_topics3.end()) {
|
||||
return; // never sent to client
|
||||
}
|
||||
bool sentAssign = it->second.sentAssign;
|
||||
m_topics3.erase(it);
|
||||
if (!sentAssign) {
|
||||
return; // never sent to client
|
||||
}
|
||||
|
||||
// map to NT3 delete message
|
||||
if (m_local && m_state == kStateRunning) {
|
||||
net3::WireEncodeEntryDelete(m_wire.Send().stream(), topic->id);
|
||||
Flush();
|
||||
} else {
|
||||
m_outgoing.emplace_back(net3::Message3::EntryDelete(topic->id));
|
||||
}
|
||||
}
|
||||
|
||||
void ServerClient3::SendPropertiesUpdate(ServerTopic* topic,
|
||||
const wpi::json& update, bool ack) {
|
||||
if (ack) {
|
||||
return; // we don't ack in NT3
|
||||
}
|
||||
auto it = m_topics3.find(topic);
|
||||
if (it == m_topics3.end()) {
|
||||
return; // never sent to client
|
||||
}
|
||||
TopicData3* topic3 = &it->second;
|
||||
// Don't send flags update unless we've already sent an assign message.
|
||||
// The assign message will contain the updated flags when we eventually
|
||||
// send it.
|
||||
if (topic3->UpdateFlags(topic) && topic3->sentAssign) {
|
||||
if (m_local && m_state == kStateRunning) {
|
||||
net3::WireEncodeFlagsUpdate(m_wire.Send().stream(), topic->id,
|
||||
topic3->flags);
|
||||
Flush();
|
||||
} else {
|
||||
m_outgoing.emplace_back(
|
||||
net3::Message3::FlagsUpdate(topic->id, topic3->flags));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ServerClient3::SendOutgoing(uint64_t curTimeMs, bool flush) {
|
||||
if (m_outgoing.empty() || m_state != kStateRunning) {
|
||||
return; // nothing to do
|
||||
}
|
||||
|
||||
// rate limit frequency of transmissions
|
||||
if (curTimeMs < (m_lastSendMs + kMinPeriodMs)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!m_wire.Ready()) {
|
||||
uint64_t lastFlushTime = m_wire.GetLastFlushTime();
|
||||
uint64_t now = wpi::Now();
|
||||
if (lastFlushTime != 0 && now > (lastFlushTime + kWireMaxNotReadyUs)) {
|
||||
m_wire.Disconnect("transmit stalled");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
auto out = m_wire.Send();
|
||||
for (auto&& msg : m_outgoing) {
|
||||
net3::WireEncode(out.stream(), msg);
|
||||
}
|
||||
m_wire.Flush();
|
||||
m_outgoing.resize(0);
|
||||
m_outgoingValueMap.clear();
|
||||
m_lastSendMs = curTimeMs;
|
||||
}
|
||||
|
||||
void ServerClient3::KeepAlive() {
|
||||
DEBUG4("KeepAlive({})", m_id);
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received unexpected KeepAlive message");
|
||||
return;
|
||||
}
|
||||
// ignore
|
||||
}
|
||||
|
||||
void ServerClient3::ServerHelloDone() {
|
||||
DEBUG4("ServerHelloDone({})", m_id);
|
||||
m_decoder.SetError("received unexpected ServerHelloDone message");
|
||||
}
|
||||
|
||||
void ServerClient3::ClientHelloDone() {
|
||||
DEBUG4("ClientHelloDone({})", m_id);
|
||||
if (m_state != kStateServerHelloComplete) {
|
||||
m_decoder.SetError("received unexpected ClientHelloDone message");
|
||||
return;
|
||||
}
|
||||
m_state = kStateRunning;
|
||||
}
|
||||
|
||||
void ServerClient3::ClearEntries() {
|
||||
DEBUG4("ClearEntries({})", m_id);
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received unexpected ClearEntries message");
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto topic3it : m_topics3) {
|
||||
ServerTopic* topic = topic3it.first;
|
||||
|
||||
// make sure we send assign the next time
|
||||
topic3it.second.sentAssign = false;
|
||||
|
||||
// unpublish from this client (if it was previously published)
|
||||
if (topic3it.second.published) {
|
||||
topic3it.second.published = false;
|
||||
auto publisherIt = m_publishers.find(topic3it.second.pubuid);
|
||||
if (publisherIt != m_publishers.end()) {
|
||||
// remove publisher from topic
|
||||
topic->RemovePublisher(this, publisherIt->second.get());
|
||||
|
||||
// remove publisher from client
|
||||
m_publishers.erase(publisherIt);
|
||||
|
||||
// update meta data
|
||||
m_storage.UpdateMetaTopicPub(topic);
|
||||
UpdateMetaClientPub();
|
||||
}
|
||||
}
|
||||
|
||||
// set retained=false
|
||||
m_storage.SetProperties(this, topic, {{"retained", false}});
|
||||
}
|
||||
}
|
||||
|
||||
void ServerClient3::ProtoUnsup(unsigned int proto_rev) {
|
||||
DEBUG4("ProtoUnsup({})", m_id);
|
||||
m_decoder.SetError("received unexpected ProtoUnsup message");
|
||||
}
|
||||
|
||||
void ServerClient3::ClientHello(std::string_view self_id,
|
||||
unsigned int proto_rev) {
|
||||
DEBUG4("ClientHello({}, '{}', {:04x})", m_id, self_id, proto_rev);
|
||||
if (m_state != kStateInitial) {
|
||||
m_decoder.SetError("received unexpected ClientHello message");
|
||||
return;
|
||||
}
|
||||
if (proto_rev != 0x0300) {
|
||||
net3::WireEncodeProtoUnsup(m_wire.Send().stream(), 0x0300);
|
||||
Flush();
|
||||
m_decoder.SetError(
|
||||
fmt::format("unsupported protocol version {:04x}", proto_rev));
|
||||
return;
|
||||
}
|
||||
// create a unique name including client id
|
||||
m_name = fmt::format("{}-NT3@{}", self_id, m_connInfo);
|
||||
m_connected(m_name, 0x0300);
|
||||
m_connected = nullptr; // no longer required
|
||||
|
||||
// create client meta topics
|
||||
m_metaPub = m_storage.CreateMetaTopic(fmt::format("$clientpub${}", m_name));
|
||||
m_metaSub = m_storage.CreateMetaTopic(fmt::format("$clientsub${}", m_name));
|
||||
|
||||
// subscribe and send initial assignments
|
||||
auto& sub = m_subscribers[0];
|
||||
std::string prefix;
|
||||
PubSubOptions options;
|
||||
options.prefixMatch = true;
|
||||
sub = std::make_unique<ServerSubscriber>(
|
||||
GetName(), std::span<const std::string>{{prefix}}, 0, options);
|
||||
m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->GetPeriodMs());
|
||||
m_setPeriodic(m_periodMs);
|
||||
|
||||
{
|
||||
auto out = m_wire.Send();
|
||||
net3::WireEncodeServerHello(out.stream(), 0, "server");
|
||||
m_storage.ForEachTopic([&](ServerTopic* topic) {
|
||||
if (topic && !topic->special && topic->IsPublished() &&
|
||||
topic->lastValue) {
|
||||
DEBUG4("client {}: initial announce of '{}' (id {})", m_id, topic->name,
|
||||
topic->id);
|
||||
topic->clients[this].AddSubscriber(sub.get());
|
||||
m_storage.UpdateMetaTopicSub(topic);
|
||||
|
||||
TopicData3* topic3 = GetTopic3(topic);
|
||||
++topic3->seqNum;
|
||||
net3::WireEncodeEntryAssign(out.stream(), topic->name, topic->id,
|
||||
topic3->seqNum.value(), topic->lastValue,
|
||||
topic3->flags);
|
||||
topic3->sentAssign = true;
|
||||
}
|
||||
});
|
||||
net3::WireEncodeServerHelloDone(out.stream());
|
||||
}
|
||||
Flush();
|
||||
m_state = kStateServerHelloComplete;
|
||||
|
||||
// update meta topics
|
||||
UpdateMetaClientPub();
|
||||
UpdateMetaClientSub();
|
||||
}
|
||||
|
||||
void ServerClient3::ServerHello(unsigned int flags, std::string_view self_id) {
|
||||
DEBUG4("ServerHello({}, {}, {})", m_id, flags, self_id);
|
||||
m_decoder.SetError("received unexpected ServerHello message");
|
||||
}
|
||||
|
||||
void ServerClient3::EntryAssign(std::string_view name, unsigned int id,
|
||||
unsigned int seq_num, const Value& value,
|
||||
unsigned int flags) {
|
||||
DEBUG4("EntryAssign({}, {}, {}, {}, {})", m_id, id, seq_num,
|
||||
static_cast<int>(value.type()), flags);
|
||||
if (id != 0xffff) {
|
||||
DEBUG3("ignored EntryAssign from {} with non-0xffff id {}", m_id, id);
|
||||
return;
|
||||
}
|
||||
|
||||
// convert from NT3 info
|
||||
auto typeStr = TypeToString(value.type());
|
||||
wpi::json properties = wpi::json::object();
|
||||
properties["retained"] = true; // treat all NT3 published topics as retained
|
||||
properties["cached"] = true; // treat all NT3 published topics as cached
|
||||
if ((flags & NT_PERSISTENT) != 0) {
|
||||
properties["persistent"] = true;
|
||||
}
|
||||
|
||||
// create topic
|
||||
auto topic = m_storage.CreateTopic(this, name, typeStr, properties);
|
||||
TopicData3* topic3 = GetTopic3(topic);
|
||||
if (topic3->published || topic3->sentAssign) {
|
||||
WARN("ignoring client {} duplicate publish of '{}'", m_id, name);
|
||||
return;
|
||||
}
|
||||
++topic3->seqNum;
|
||||
topic3->published = true;
|
||||
topic3->pubuid = m_nextPubUid++;
|
||||
topic3->sentAssign = true;
|
||||
|
||||
// create publisher
|
||||
auto [publisherIt, isNew] = m_publishers.try_emplace(
|
||||
topic3->pubuid,
|
||||
std::make_unique<ServerPublisher>(GetName(), topic, topic3->pubuid));
|
||||
if (!isNew) {
|
||||
return; // shouldn't happen, but just in case...
|
||||
}
|
||||
|
||||
// add publisher to topic
|
||||
topic->AddPublisher(this, publisherIt->getSecond().get());
|
||||
|
||||
// update meta data
|
||||
m_storage.UpdateMetaTopicPub(topic);
|
||||
UpdateMetaClientPub();
|
||||
|
||||
// acts as an announce + data update
|
||||
SendAnnounce(topic, topic3->pubuid);
|
||||
m_storage.SetValue(this, topic, value);
|
||||
|
||||
// respond with assign message with assigned topic ID
|
||||
if (m_local && m_state == kStateRunning) {
|
||||
net3::WireEncodeEntryAssign(m_wire.Send().stream(), topic->name, topic->id,
|
||||
topic3->seqNum.value(), value, topic3->flags);
|
||||
} else {
|
||||
m_outgoing.emplace_back(net3::Message3::EntryAssign(
|
||||
topic->name, topic->id, topic3->seqNum.value(), value, topic3->flags));
|
||||
}
|
||||
}
|
||||
|
||||
void ServerClient3::EntryUpdate(unsigned int id, unsigned int seq_num,
|
||||
const Value& value) {
|
||||
DEBUG4("EntryUpdate({}, {}, {}, {})", m_id, id, seq_num,
|
||||
static_cast<int>(value.type()));
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received unexpected EntryUpdate message");
|
||||
return;
|
||||
}
|
||||
|
||||
ServerTopic* topic = m_storage.GetTopic(id);
|
||||
if (!topic || !topic->IsPublished()) {
|
||||
DEBUG3("ignored EntryUpdate from {} on non-existent topic {}", m_id, id);
|
||||
return;
|
||||
}
|
||||
|
||||
TopicData3* topic3 = GetTopic3(topic);
|
||||
if (!topic3->published) {
|
||||
topic3->published = true;
|
||||
topic3->pubuid = m_nextPubUid++;
|
||||
|
||||
// create publisher
|
||||
auto [publisherIt, isNew] = m_publishers.try_emplace(
|
||||
topic3->pubuid,
|
||||
std::make_unique<ServerPublisher>(GetName(), topic, topic3->pubuid));
|
||||
if (isNew) {
|
||||
// add publisher to topic
|
||||
topic->AddPublisher(this, publisherIt->getSecond().get());
|
||||
|
||||
// update meta data
|
||||
m_storage.UpdateMetaTopicPub(topic);
|
||||
UpdateMetaClientPub();
|
||||
}
|
||||
}
|
||||
topic3->seqNum = net3::SequenceNumber{seq_num};
|
||||
|
||||
m_storage.SetValue(this, topic, value);
|
||||
}
|
||||
|
||||
void ServerClient3::FlagsUpdate(unsigned int id, unsigned int flags) {
|
||||
DEBUG4("FlagsUpdate({}, {}, {})", m_id, id, flags);
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received unexpected FlagsUpdate message");
|
||||
return;
|
||||
}
|
||||
ServerTopic* topic = m_storage.GetTopic(id);
|
||||
if (!topic || !topic->IsPublished()) {
|
||||
DEBUG3("ignored FlagsUpdate from {} on non-existent topic {}", m_id, id);
|
||||
return;
|
||||
}
|
||||
if (topic->special) {
|
||||
DEBUG3("ignored FlagsUpdate from {} on special topic {}", m_id, id);
|
||||
return;
|
||||
}
|
||||
m_storage.SetFlags(this, topic, flags);
|
||||
}
|
||||
|
||||
void ServerClient3::EntryDelete(unsigned int id) {
|
||||
DEBUG4("EntryDelete({}, {})", m_id, id);
|
||||
if (m_state != kStateRunning) {
|
||||
m_decoder.SetError("received unexpected EntryDelete message");
|
||||
return;
|
||||
}
|
||||
ServerTopic* topic = m_storage.GetTopic(id);
|
||||
if (!topic || !topic->IsPublished()) {
|
||||
DEBUG3("ignored EntryDelete from {} on non-existent topic {}", m_id, id);
|
||||
return;
|
||||
}
|
||||
if (topic->special) {
|
||||
DEBUG3("ignored EntryDelete from {} on special topic {}", m_id, id);
|
||||
return;
|
||||
}
|
||||
|
||||
auto topic3it = m_topics3.find(topic);
|
||||
if (topic3it != m_topics3.end()) {
|
||||
// make sure we send assign the next time
|
||||
topic3it->second.sentAssign = false;
|
||||
|
||||
// unpublish from this client (if it was previously published)
|
||||
if (topic3it->second.published) {
|
||||
topic3it->second.published = false;
|
||||
auto publisherIt = m_publishers.find(topic3it->second.pubuid);
|
||||
if (publisherIt != m_publishers.end()) {
|
||||
// remove publisher from topic
|
||||
topic->RemovePublisher(this, publisherIt->second.get());
|
||||
|
||||
// remove publisher from client
|
||||
m_publishers.erase(publisherIt);
|
||||
|
||||
// update meta data
|
||||
m_storage.UpdateMetaTopicPub(topic);
|
||||
UpdateMetaClientPub();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// set retained=false
|
||||
m_storage.SetProperties(this, topic, {{"retained", false}});
|
||||
}
|
||||
@@ -1,97 +0,0 @@
|
||||
// Copyright (c) FIRST and other WPILib contributors.
|
||||
// Open Source Software; you can modify and/or share it under the terms of
|
||||
// the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "ServerClient.h"
|
||||
#include "net/ClientMessageQueue.h"
|
||||
#include "net3/Message3.h"
|
||||
#include "net3/SequenceNumber.h"
|
||||
#include "net3/WireConnection3.h"
|
||||
#include "net3/WireDecoder3.h"
|
||||
#include "server/Functions.h"
|
||||
|
||||
namespace nt::server {
|
||||
|
||||
class ServerClient3 final : public ServerClient, private net3::MessageHandler3 {
|
||||
public:
|
||||
ServerClient3(std::string_view connInfo, bool local,
|
||||
net3::WireConnection3& wire, Connected3Func connected,
|
||||
SetPeriodicFunc setPeriodic, ServerStorage& storage, int id,
|
||||
wpi::Logger& logger)
|
||||
: ServerClient{"", connInfo, local, setPeriodic, storage, id, logger},
|
||||
m_connected{std::move(connected)},
|
||||
m_wire{wire},
|
||||
m_decoder{*this},
|
||||
m_incoming{logger} {}
|
||||
|
||||
bool ProcessIncomingText(std::string_view data) final { return false; }
|
||||
bool ProcessIncomingBinary(std::span<const uint8_t> data) final;
|
||||
|
||||
bool ProcessIncomingMessages(size_t max) final { return false; }
|
||||
|
||||
void SendValue(ServerTopic* topic, const Value& value,
|
||||
net::ValueSendMode mode) final;
|
||||
void SendAnnounce(ServerTopic* topic, std::optional<int> pubuid) final;
|
||||
void SendUnannounce(ServerTopic* topic) final;
|
||||
void SendPropertiesUpdate(ServerTopic* topic, const wpi::json& update,
|
||||
bool ack) final;
|
||||
void SendOutgoing(uint64_t curTimeMs, bool flush) final;
|
||||
|
||||
void Flush() final { m_wire.Flush(); }
|
||||
|
||||
private:
|
||||
// MessageHandler3 interface
|
||||
void KeepAlive() final;
|
||||
void ServerHelloDone() final;
|
||||
void ClientHelloDone() final;
|
||||
void ClearEntries() final;
|
||||
void ProtoUnsup(unsigned int proto_rev) final;
|
||||
void ClientHello(std::string_view self_id, unsigned int proto_rev) final;
|
||||
void ServerHello(unsigned int flags, std::string_view self_id) final;
|
||||
void EntryAssign(std::string_view name, unsigned int id, unsigned int seq_num,
|
||||
const Value& value, unsigned int flags) final;
|
||||
void EntryUpdate(unsigned int id, unsigned int seq_num,
|
||||
const Value& value) final;
|
||||
void FlagsUpdate(unsigned int id, unsigned int flags) final;
|
||||
void EntryDelete(unsigned int id) final;
|
||||
void ExecuteRpc(unsigned int id, unsigned int uid,
|
||||
std::span<const uint8_t> params) final {}
|
||||
void RpcResponse(unsigned int id, unsigned int uid,
|
||||
std::span<const uint8_t> result) final {}
|
||||
|
||||
Connected3Func m_connected;
|
||||
net3::WireConnection3& m_wire;
|
||||
|
||||
enum State { kStateInitial, kStateServerHelloComplete, kStateRunning };
|
||||
State m_state{kStateInitial};
|
||||
net3::WireDecoder3 m_decoder;
|
||||
|
||||
net::NetworkIncomingClientQueue m_incoming;
|
||||
std::vector<net3::Message3> m_outgoing;
|
||||
wpi::DenseMap<NT_Topic, size_t> m_outgoingValueMap;
|
||||
int64_t m_nextPubUid{1};
|
||||
uint64_t m_lastSendMs{0};
|
||||
|
||||
struct TopicData3 {
|
||||
explicit TopicData3(ServerTopic* topic) { UpdateFlags(topic); }
|
||||
|
||||
unsigned int flags{0};
|
||||
net3::SequenceNumber seqNum;
|
||||
bool sentAssign{false};
|
||||
bool published{false};
|
||||
int64_t pubuid{0};
|
||||
|
||||
bool UpdateFlags(ServerTopic* topic);
|
||||
};
|
||||
wpi::DenseMap<ServerTopic*, TopicData3> m_topics3;
|
||||
TopicData3* GetTopic3(ServerTopic* topic) {
|
||||
return &m_topics3.try_emplace(topic, topic).first->second;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace nt::server
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
#include "Log.h"
|
||||
#include "server/MessagePackWriter.h"
|
||||
#include "server/ServerClient3.h"
|
||||
#include "server/ServerClient4.h"
|
||||
#include "server/ServerClientLocal.h"
|
||||
|
||||
@@ -60,20 +59,6 @@ std::pair<std::string, int> ServerImpl::AddClient(std::string_view name,
|
||||
return {std::move(dedupName), index};
|
||||
}
|
||||
|
||||
int ServerImpl::AddClient3(std::string_view connInfo, bool local,
|
||||
net3::WireConnection3& wire,
|
||||
Connected3Func connected,
|
||||
SetPeriodicFunc setPeriodic) {
|
||||
size_t index = GetEmptyClientSlot();
|
||||
|
||||
m_clients[index] = std::make_unique<ServerClient3>(
|
||||
connInfo, local, wire, std::move(connected), std::move(setPeriodic),
|
||||
m_storage, index, m_logger);
|
||||
|
||||
DEBUG3("AddClient3('{}') -> {}", connInfo, index);
|
||||
return index;
|
||||
}
|
||||
|
||||
std::shared_ptr<void> ServerImpl::RemoveClient(int clientId) {
|
||||
DEBUG3("RemoveClient({})", clientId);
|
||||
auto& client = m_clients[clientId];
|
||||
|
||||
@@ -61,9 +61,6 @@ class ServerImpl final {
|
||||
std::string_view connInfo, bool local,
|
||||
net::WireConnection& wire,
|
||||
SetPeriodicFunc setPeriodic);
|
||||
int AddClient3(std::string_view connInfo, bool local,
|
||||
net3::WireConnection3& wire, Connected3Func connected,
|
||||
SetPeriodicFunc setPeriodic);
|
||||
std::shared_ptr<void> RemoveClient(int clientId);
|
||||
|
||||
void ConnectionsChanged(const std::vector<ConnectionInfo>& conns) {
|
||||
|
||||
Reference in New Issue
Block a user