[ntcore] Move ServerImpl to nt::server namespace

This commit is contained in:
Peter Johnson
2024-10-15 17:03:12 -07:00
parent a0f38f83f9
commit f738fc92f0
5 changed files with 91 additions and 86 deletions

View File

@@ -18,8 +18,8 @@
#include "net/ClientMessageQueue.h"
#include "net/Message.h"
#include "net/ServerImpl.h"
#include "ntcore_cpp.h"
#include "server/ServerImpl.h"
namespace wpi {
class Logger;
@@ -78,7 +78,7 @@ class NetworkServer {
using Queue = net::LocalClientMessageQueue;
net::ClientMessage m_localMsgs[Queue::kBlockSize];
net::ServerImpl m_serverImpl;
server::ServerImpl m_serverImpl;
// shared with user (must be atomic or mutex-protected)
std::atomic<wpi::uv::Async<>*> m_flushLocalAtomic{nullptr};

View File

@@ -23,9 +23,7 @@
#include <wpi/raw_ostream.h>
#include <wpi/timestamp.h>
#include "IConnectionList.h"
#include "Log.h"
#include "NetworkInterface.h"
#include "Types_internal.h"
#include "net/Message.h"
#include "net/WireEncoder.h"
@@ -35,7 +33,7 @@
#include "ntcore_c.h"
using namespace nt;
using namespace nt::net;
using namespace nt::server;
using namespace mpack;
// maximum amount of time the wire can be not ready to send another
@@ -293,7 +291,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe(
// update periodic sender (if not local)
if (!m_local) {
m_periodMs = UpdatePeriodCalc(m_periodMs, sub->periodMs);
m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs);
m_setPeriodic(m_periodMs);
}
@@ -312,7 +310,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe(
bool wasSubscribed =
tcdIt != topic->clients.end() && !tcdIt->second.subscribers.empty();
bool wasSubscribedValue =
wasSubscribed ? tcdIt->second.sendMode != ValueSendMode::kDisabled
wasSubscribed ? tcdIt->second.sendMode != net::ValueSendMode::kDisabled
: false;
bool added = false;
@@ -344,7 +342,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe(
for (auto topic : dataToSend) {
DEBUG4("send last value for {} to client {}", topic->name, m_id);
SendValue(topic, topic->lastValue, ValueSendMode::kAll);
SendValue(topic, topic->lastValue, net::ValueSendMode::kAll);
}
}
@@ -372,7 +370,7 @@ void ServerImpl::ClientData4Base::ClientUnsubscribe(int subuid) {
// loop over all subscribers to update period
if (!m_local) {
m_periodMs = CalculatePeriod(
m_periodMs = net::CalculatePeriod(
m_subscribers, [](auto& x) { return x.getSecond()->periodMs; });
m_setPeriodic(m_periodMs);
}
@@ -392,7 +390,7 @@ void ServerImpl::ClientData4Base::ClientSetValue(int pubuid,
void ServerImpl::ClientDataLocal::SendValue(TopicData* topic,
const Value& value,
ValueSendMode mode) {
net::ValueSendMode mode) {
if (m_server.m_local) {
m_server.m_local->ServerSetValue(topic->localTopic, value);
}
@@ -435,10 +433,10 @@ void ServerImpl::ClientDataLocal::SendPropertiesUpdate(TopicData* topic,
}
bool ServerImpl::ClientData4Base::DoProcessIncomingMessages(
ClientMessageQueue& queue, size_t max) {
net::ClientMessageQueue& queue, size_t max) {
DEBUG4("ProcessIncomingMessage()");
max = (std::min)(m_msgsBuf.size(), max);
std::span<ClientMessage> msgs =
std::span<net::ClientMessage> msgs =
queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max));
// just map as a normal client into client=0 calls
@@ -446,21 +444,21 @@ bool ServerImpl::ClientData4Base::DoProcessIncomingMessages(
bool updatesub = false;
for (const auto& elem : msgs) { // NOLINT
// common case is value, so check that first
if (auto msg = std::get_if<ClientValueMsg>(&elem.contents)) {
if (auto msg = std::get_if<net::ClientValueMsg>(&elem.contents)) {
ClientSetValue(msg->pubuid, msg->value);
} else if (auto msg = std::get_if<PublishMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::PublishMsg>(&elem.contents)) {
ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties,
msg->options);
updatepub = true;
} else if (auto msg = std::get_if<UnpublishMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::UnpublishMsg>(&elem.contents)) {
ClientUnpublish(msg->pubuid);
updatepub = true;
} else if (auto msg = std::get_if<SetPropertiesMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::SetPropertiesMsg>(&elem.contents)) {
ClientSetProperties(msg->name, msg->update);
} else if (auto msg = std::get_if<SubscribeMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::SubscribeMsg>(&elem.contents)) {
ClientSubscribe(msg->subuid, msg->topicNames, msg->options);
updatesub = true;
} else if (auto msg = std::get_if<UnsubscribeMsg>(&elem.contents)) {
} else if (auto msg = std::get_if<net::UnsubscribeMsg>(&elem.contents)) {
ClientUnsubscribe(msg->subuid);
updatesub = true;
}
@@ -502,7 +500,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary(
int pubuid;
Value value;
std::string error;
if (!WireDecodeBinary(&data, &pubuid, &value, &error, 0)) {
if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) {
m_wire.Disconnect(fmt::format("binary decode error: {}", error));
break;
}
@@ -512,7 +510,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary(
auto now = wpi::Now();
DEBUG4("RTT ping from {}, responding with time={}", m_id, now);
m_wire.SendBinary(
[&](auto& os) { WireEncodeBinary(os, -1, now, value); });
[&](auto& os) { net::WireEncodeBinary(os, -1, now, value); });
continue;
}
@@ -531,7 +529,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary(
}
void ServerImpl::ClientData4::SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) {
net::ValueSendMode mode) {
m_outgoing.SendValue(topic->id, value, mode);
}
@@ -545,8 +543,8 @@ void ServerImpl::ClientData4::SendAnnounce(TopicData* topic,
if (m_local) {
int unsent = m_wire.WriteText([&](auto& os) {
WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr,
topic->properties, pubuid);
net::WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr,
topic->properties, pubuid);
});
if (unsent < 0) {
return; // error
@@ -556,8 +554,8 @@ void ServerImpl::ClientData4::SendAnnounce(TopicData* topic,
}
}
m_outgoing.SendMessage(
topic->id, AnnounceMsg{topic->name, static_cast<int>(topic->id),
topic->typeStr, pubuid, topic->properties});
topic->id, net::AnnounceMsg{topic->name, static_cast<int>(topic->id),
topic->typeStr, pubuid, topic->properties});
m_server.m_controlReady = true;
}
@@ -569,8 +567,9 @@ void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) {
sent = false;
if (m_local) {
int unsent = m_wire.WriteText(
[&](auto& os) { WireEncodeUnannounce(os, topic->name, topic->id); });
int unsent = m_wire.WriteText([&](auto& os) {
net::WireEncodeUnannounce(os, topic->name, topic->id);
});
if (unsent < 0) {
return; // error
}
@@ -579,7 +578,7 @@ void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) {
}
}
m_outgoing.SendMessage(
topic->id, UnannounceMsg{topic->name, static_cast<int>(topic->id)});
topic->id, net::UnannounceMsg{topic->name, static_cast<int>(topic->id)});
m_outgoing.EraseId(topic->id);
m_server.m_controlReady = true;
}
@@ -593,7 +592,7 @@ void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic,
if (m_local) {
int unsent = m_wire.WriteText([&](auto& os) {
WireEncodePropertiesUpdate(os, topic->name, update, ack);
net::WireEncodePropertiesUpdate(os, topic->name, update, ack);
});
if (unsent < 0) {
return; // error
@@ -603,7 +602,7 @@ void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic,
}
}
m_outgoing.SendMessage(topic->id,
PropertiesUpdateMsg{topic->name, update, ack});
net::PropertiesUpdateMsg{topic->name, update, ack});
m_server.m_controlReady = true;
}
@@ -618,8 +617,8 @@ void ServerImpl::ClientData4::SendOutgoing(uint64_t curTimeMs, bool flush) {
void ServerImpl::ClientData4::UpdatePeriod(TopicData::TopicClientData& tcd,
TopicData* topic) {
uint32_t period =
CalculatePeriod(tcd.subscribers, [](auto& x) { return x->periodMs; });
uint32_t period = net::CalculatePeriod(tcd.subscribers,
[](auto& x) { return x->periodMs; });
DEBUG4("updating {} period to {} ms", topic->name, period);
m_outgoing.SetPeriod(topic->id, period);
}
@@ -640,21 +639,21 @@ bool ServerImpl::ClientData3::ProcessIncomingBinary(
}
void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) {
net::ValueSendMode mode) {
if (m_state != kStateRunning) {
if (mode == ValueSendMode::kImm) {
mode = ValueSendMode::kAll;
if (mode == net::ValueSendMode::kImm) {
mode = net::ValueSendMode::kAll;
}
} else if (m_local) {
mode = ValueSendMode::kImm; // always send local immediately
mode = net::ValueSendMode::kImm; // always send local immediately
}
TopicData3* topic3 = GetTopic3(topic);
bool added = false;
switch (mode) {
case ValueSendMode::kDisabled: // do nothing
case net::ValueSendMode::kDisabled: // do nothing
break;
case ValueSendMode::kImm: // send immediately
case net::ValueSendMode::kImm: // send immediately
++topic3->seqNum;
if (topic3->sentAssign) {
net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id,
@@ -669,7 +668,7 @@ void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value,
Flush();
}
break;
case ValueSendMode::kNormal: {
case net::ValueSendMode::kNormal: {
// replace, or append if not present
wpi::DenseMap<NT_Topic, size_t>::iterator it;
std::tie(it, added) =
@@ -686,7 +685,7 @@ void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value,
}
}
// fallthrough
case ValueSendMode::kAll: // append to outgoing
case net::ValueSendMode::kAll: // append to outgoing
if (!added) {
m_outgoingValueMap[topic->id] = m_outgoing.size();
}
@@ -889,7 +888,7 @@ void ServerImpl::ClientData3::ClientHello(std::string_view self_id,
options.prefixMatch = true;
sub = std::make_unique<SubscriberData>(
this, std::span<const std::string>{{prefix}}, 0, options);
m_periodMs = UpdatePeriodCalc(m_periodMs, sub->periodMs);
m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs);
m_setPeriodic(m_periodMs);
{
@@ -1212,7 +1211,7 @@ ServerImpl::ServerImpl(wpi::Logger& logger) : m_logger{logger} {
std::pair<std::string, int> ServerImpl::AddClient(
std::string_view name, std::string_view connInfo, bool local,
WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic) {
net::WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic) {
if (name.empty()) {
name = "NT4";
}
@@ -1842,7 +1841,7 @@ void ServerImpl::SetValue(ClientData* client, TopicData* topic,
for (auto&& tcd : topic->clients) {
if (tcd.first != client &&
tcd.second.sendMode != ValueSendMode::kDisabled) {
tcd.second.sendMode != net::ValueSendMode::kDisabled) {
tcd.first->SendValue(topic, value, tcd.second.sendMode);
}
}
@@ -1942,8 +1941,8 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) {
}
}
void ServerImpl::SetLocal(ServerMessageHandler* local,
ClientMessageQueue* queue) {
void ServerImpl::SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue) {
DEBUG4("SetLocal()");
m_local = local;
m_localClient->SetQueue(queue);

View File

@@ -21,14 +21,14 @@
#include <wpi/UidVector.h>
#include <wpi/json.h>
#include "ClientMessageQueue.h"
#include "Message.h"
#include "NetworkOutgoingQueue.h"
#include "NetworkPing.h"
#include "PubSubOptions.h"
#include "WireConnection.h"
#include "WireDecoder.h"
#include "WireEncoder.h"
#include "net/ClientMessageQueue.h"
#include "net/Message.h"
#include "net/NetworkOutgoingQueue.h"
#include "net/NetworkPing.h"
#include "net/WireConnection.h"
#include "net/WireDecoder.h"
#include "net/WireEncoder.h"
#include "net3/Message3.h"
#include "net3/SequenceNumber.h"
#include "net3/WireConnection3.h"
@@ -41,15 +41,17 @@ class SmallVectorImpl;
class raw_ostream;
} // namespace wpi
namespace nt::net {
struct ClientMessage;
class LocalInterface;
class WireConnection;
} // namespace nt::net
namespace nt::net3 {
class WireConnection3;
} // namespace nt::net3
namespace nt::net {
struct ClientMessage;
class LocalInterface;
class WireConnection;
namespace nt::server {
class ServerImpl final {
public:
@@ -62,7 +64,8 @@ class ServerImpl final {
void SendAllOutgoing(uint64_t curTimeMs, bool flush);
void SendOutgoing(int clientId, uint64_t curTimeMs);
void SetLocal(ServerMessageHandler* local, ClientMessageQueue* queue);
void SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue);
// these return true if any messages have been queued for later processing
bool ProcessIncomingText(int clientId, std::string_view data);
@@ -76,7 +79,7 @@ class ServerImpl final {
// Caller must ensure WireConnection lifetime lasts until RemoveClient() call.
std::pair<std::string, int> AddClient(std::string_view name,
std::string_view connInfo, bool local,
WireConnection& wire,
net::WireConnection& wire,
SetPeriodicFunc setPeriodic);
int AddClient3(std::string_view connInfo, bool local,
net3::WireConnection3& wire, Connected3Func connected,
@@ -149,15 +152,15 @@ class ServerImpl final {
struct TopicClientData {
wpi::SmallPtrSet<PublisherData*, 2> publishers;
wpi::SmallPtrSet<SubscriberData*, 2> subscribers;
ValueSendMode sendMode = ValueSendMode::kDisabled;
net::ValueSendMode sendMode = net::ValueSendMode::kDisabled;
bool AddSubscriber(SubscriberData* sub) {
bool added = subscribers.insert(sub).second;
if (!sub->options.topicsOnly) {
if (sub->options.sendAll) {
sendMode = ValueSendMode::kAll;
} else if (sendMode == ValueSendMode::kDisabled) {
sendMode = ValueSendMode::kNormal;
sendMode = net::ValueSendMode::kAll;
} else if (sendMode == net::ValueSendMode::kDisabled) {
sendMode = net::ValueSendMode::kNormal;
}
}
return added;
@@ -189,7 +192,7 @@ class ServerImpl final {
virtual bool ProcessIncomingBinary(std::span<const uint8_t> data) = 0;
virtual void SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) = 0;
net::ValueSendMode mode) = 0;
virtual void SendAnnounce(TopicData* topic, std::optional<int> pubuid) = 0;
virtual void SendUnannounce(TopicData* topic) = 0;
virtual void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
@@ -234,7 +237,8 @@ class ServerImpl final {
TopicData* m_metaSub = nullptr;
};
class ClientData4Base : public ClientData, protected ClientMessageHandler {
class ClientData4Base : public ClientData,
protected net::ClientMessageHandler {
public:
ClientData4Base(std::string_view name, std::string_view connInfo,
bool local, ServerImpl::SetPeriodicFunc setPeriodic,
@@ -255,12 +259,12 @@ class ServerImpl final {
void ClientSetValue(int pubuid, const Value& value) final;
bool DoProcessIncomingMessages(ClientMessageQueue& queue, size_t max);
bool DoProcessIncomingMessages(net::ClientMessageQueue& queue, size_t max);
wpi::DenseMap<TopicData*, bool> m_announceSent;
private:
std::array<ClientMessage, 16> m_msgsBuf;
std::array<net::ClientMessage, 16> m_msgsBuf;
};
class ClientDataLocal final : public ClientData4Base {
@@ -281,7 +285,7 @@ class ServerImpl final {
}
void SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) final;
net::ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
@@ -289,17 +293,18 @@ class ServerImpl final {
void SendOutgoing(uint64_t curTimeMs, bool flush) final {}
void Flush() final {}
void SetQueue(ClientMessageQueue* queue) { m_queue = queue; }
void SetQueue(net::ClientMessageQueue* queue) { m_queue = queue; }
private:
ClientMessageQueue* m_queue = nullptr;
net::ClientMessageQueue* m_queue = nullptr;
};
class ClientData4 final : public ClientData4Base {
public:
ClientData4(std::string_view name, std::string_view connInfo, bool local,
WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic,
ServerImpl& server, int id, wpi::Logger& logger)
net::WireConnection& wire,
ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server,
int id, wpi::Logger& logger)
: ClientData4Base{name, connInfo, local, setPeriodic,
server, id, logger},
m_wire{wire},
@@ -319,7 +324,7 @@ class ServerImpl final {
}
void SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) final;
net::ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
@@ -331,12 +336,12 @@ class ServerImpl final {
void UpdatePeriod(TopicData::TopicClientData& tcd, TopicData* topic) final;
public:
WireConnection& m_wire;
net::WireConnection& m_wire;
private:
NetworkPing m_ping;
NetworkIncomingClientQueue m_incoming;
NetworkOutgoingQueue<ServerMessage> m_outgoing;
net::NetworkPing m_ping;
net::NetworkIncomingClientQueue m_incoming;
net::NetworkOutgoingQueue<net::ServerMessage> m_outgoing;
};
class ClientData3 final : public ClientData, private net3::MessageHandler3 {
@@ -358,7 +363,7 @@ class ServerImpl final {
bool ProcessIncomingMessages(size_t max) final { return false; }
void SendValue(TopicData* topic, const Value& value,
ValueSendMode mode) final;
net::ValueSendMode mode) final;
void SendAnnounce(TopicData* topic, std::optional<int> pubuid) final;
void SendUnannounce(TopicData* topic) final;
void SendPropertiesUpdate(TopicData* topic, const wpi::json& update,
@@ -395,7 +400,7 @@ class ServerImpl final {
State m_state{kStateInitial};
net3::WireDecoder3 m_decoder;
NetworkIncomingClientQueue m_incoming;
net::NetworkIncomingClientQueue m_incoming;
std::vector<net3::Message3> m_outgoing;
wpi::DenseMap<NT_Topic, size_t> m_outgoingValueMap;
int64_t m_nextPubUid{1};
@@ -475,7 +480,7 @@ class ServerImpl final {
};
wpi::Logger& m_logger;
ServerMessageHandler* m_local{nullptr};
net::ServerMessageHandler* m_local{nullptr};
bool m_controlReady{false};
ClientDataLocal* m_localClient;
@@ -510,4 +515,4 @@ class ServerImpl final {
const wpi::json& update);
};
} // namespace nt::net
} // namespace nt::server