diff --git a/ntcore/CMakeLists.txt b/ntcore/CMakeLists.txt index 94bfbe95b8..c8fdca2262 100644 --- a/ntcore/CMakeLists.txt +++ b/ntcore/CMakeLists.txt @@ -10,6 +10,7 @@ file( src/main/native/cpp/net/*.cpp src/main/native/cpp/net3/*.cpp src/main/native/cpp/networktables/*.cpp + src/main/native/cpp/server/*.cpp src/main/native/cpp/tables/*.cpp ) add_library(ntcore ${ntcore_native_src}) diff --git a/ntcore/src/main/native/cpp/NetworkServer.h b/ntcore/src/main/native/cpp/NetworkServer.h index db4ecfac05..14e7dc95ba 100644 --- a/ntcore/src/main/native/cpp/NetworkServer.h +++ b/ntcore/src/main/native/cpp/NetworkServer.h @@ -18,8 +18,8 @@ #include "net/ClientMessageQueue.h" #include "net/Message.h" -#include "net/ServerImpl.h" #include "ntcore_cpp.h" +#include "server/ServerImpl.h" namespace wpi { class Logger; @@ -78,7 +78,7 @@ class NetworkServer { using Queue = net::LocalClientMessageQueue; net::ClientMessage m_localMsgs[Queue::kBlockSize]; - net::ServerImpl m_serverImpl; + server::ServerImpl m_serverImpl; // shared with user (must be atomic or mutex-protected) std::atomic*> m_flushLocalAtomic{nullptr}; diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.cpp b/ntcore/src/main/native/cpp/server/ServerImpl.cpp similarity index 95% rename from ntcore/src/main/native/cpp/net/ServerImpl.cpp rename to ntcore/src/main/native/cpp/server/ServerImpl.cpp index a33c58019b..38f9fbeffb 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/server/ServerImpl.cpp @@ -23,9 +23,7 @@ #include #include -#include "IConnectionList.h" #include "Log.h" -#include "NetworkInterface.h" #include "Types_internal.h" #include "net/Message.h" #include "net/WireEncoder.h" @@ -35,7 +33,7 @@ #include "ntcore_c.h" using namespace nt; -using namespace nt::net; +using namespace nt::server; using namespace mpack; // maximum amount of time the wire can be not ready to send another @@ -293,7 +291,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe( // update periodic sender (if not local) if (!m_local) { - m_periodMs = UpdatePeriodCalc(m_periodMs, sub->periodMs); + m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs); m_setPeriodic(m_periodMs); } @@ -312,7 +310,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe( bool wasSubscribed = tcdIt != topic->clients.end() && !tcdIt->second.subscribers.empty(); bool wasSubscribedValue = - wasSubscribed ? tcdIt->second.sendMode != ValueSendMode::kDisabled + wasSubscribed ? tcdIt->second.sendMode != net::ValueSendMode::kDisabled : false; bool added = false; @@ -344,7 +342,7 @@ void ServerImpl::ClientData4Base::ClientSubscribe( for (auto topic : dataToSend) { DEBUG4("send last value for {} to client {}", topic->name, m_id); - SendValue(topic, topic->lastValue, ValueSendMode::kAll); + SendValue(topic, topic->lastValue, net::ValueSendMode::kAll); } } @@ -372,7 +370,7 @@ void ServerImpl::ClientData4Base::ClientUnsubscribe(int subuid) { // loop over all subscribers to update period if (!m_local) { - m_periodMs = CalculatePeriod( + m_periodMs = net::CalculatePeriod( m_subscribers, [](auto& x) { return x.getSecond()->periodMs; }); m_setPeriodic(m_periodMs); } @@ -392,7 +390,7 @@ void ServerImpl::ClientData4Base::ClientSetValue(int pubuid, void ServerImpl::ClientDataLocal::SendValue(TopicData* topic, const Value& value, - ValueSendMode mode) { + net::ValueSendMode mode) { if (m_server.m_local) { m_server.m_local->ServerSetValue(topic->localTopic, value); } @@ -435,10 +433,10 @@ void ServerImpl::ClientDataLocal::SendPropertiesUpdate(TopicData* topic, } bool ServerImpl::ClientData4Base::DoProcessIncomingMessages( - ClientMessageQueue& queue, size_t max) { + net::ClientMessageQueue& queue, size_t max) { DEBUG4("ProcessIncomingMessage()"); max = (std::min)(m_msgsBuf.size(), max); - std::span msgs = + std::span msgs = queue.ReadQueue(wpi::take_front(std::span{m_msgsBuf}, max)); // just map as a normal client into client=0 calls @@ -446,21 +444,21 @@ bool ServerImpl::ClientData4Base::DoProcessIncomingMessages( bool updatesub = false; for (const auto& elem : msgs) { // NOLINT // common case is value, so check that first - if (auto msg = std::get_if(&elem.contents)) { + if (auto msg = std::get_if(&elem.contents)) { ClientSetValue(msg->pubuid, msg->value); - } else if (auto msg = std::get_if(&elem.contents)) { + } else if (auto msg = std::get_if(&elem.contents)) { ClientPublish(msg->pubuid, msg->name, msg->typeStr, msg->properties, msg->options); updatepub = true; - } else if (auto msg = std::get_if(&elem.contents)) { + } else if (auto msg = std::get_if(&elem.contents)) { ClientUnpublish(msg->pubuid); updatepub = true; - } else if (auto msg = std::get_if(&elem.contents)) { + } else if (auto msg = std::get_if(&elem.contents)) { ClientSetProperties(msg->name, msg->update); - } else if (auto msg = std::get_if(&elem.contents)) { + } else if (auto msg = std::get_if(&elem.contents)) { ClientSubscribe(msg->subuid, msg->topicNames, msg->options); updatesub = true; - } else if (auto msg = std::get_if(&elem.contents)) { + } else if (auto msg = std::get_if(&elem.contents)) { ClientUnsubscribe(msg->subuid); updatesub = true; } @@ -502,7 +500,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary( int pubuid; Value value; std::string error; - if (!WireDecodeBinary(&data, &pubuid, &value, &error, 0)) { + if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) { m_wire.Disconnect(fmt::format("binary decode error: {}", error)); break; } @@ -512,7 +510,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary( auto now = wpi::Now(); DEBUG4("RTT ping from {}, responding with time={}", m_id, now); m_wire.SendBinary( - [&](auto& os) { WireEncodeBinary(os, -1, now, value); }); + [&](auto& os) { net::WireEncodeBinary(os, -1, now, value); }); continue; } @@ -531,7 +529,7 @@ bool ServerImpl::ClientData4::ProcessIncomingBinary( } void ServerImpl::ClientData4::SendValue(TopicData* topic, const Value& value, - ValueSendMode mode) { + net::ValueSendMode mode) { m_outgoing.SendValue(topic->id, value, mode); } @@ -545,8 +543,8 @@ void ServerImpl::ClientData4::SendAnnounce(TopicData* topic, if (m_local) { int unsent = m_wire.WriteText([&](auto& os) { - WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr, - topic->properties, pubuid); + net::WireEncodeAnnounce(os, topic->name, topic->id, topic->typeStr, + topic->properties, pubuid); }); if (unsent < 0) { return; // error @@ -556,8 +554,8 @@ void ServerImpl::ClientData4::SendAnnounce(TopicData* topic, } } m_outgoing.SendMessage( - topic->id, AnnounceMsg{topic->name, static_cast(topic->id), - topic->typeStr, pubuid, topic->properties}); + topic->id, net::AnnounceMsg{topic->name, static_cast(topic->id), + topic->typeStr, pubuid, topic->properties}); m_server.m_controlReady = true; } @@ -569,8 +567,9 @@ void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) { sent = false; if (m_local) { - int unsent = m_wire.WriteText( - [&](auto& os) { WireEncodeUnannounce(os, topic->name, topic->id); }); + int unsent = m_wire.WriteText([&](auto& os) { + net::WireEncodeUnannounce(os, topic->name, topic->id); + }); if (unsent < 0) { return; // error } @@ -579,7 +578,7 @@ void ServerImpl::ClientData4::SendUnannounce(TopicData* topic) { } } m_outgoing.SendMessage( - topic->id, UnannounceMsg{topic->name, static_cast(topic->id)}); + topic->id, net::UnannounceMsg{topic->name, static_cast(topic->id)}); m_outgoing.EraseId(topic->id); m_server.m_controlReady = true; } @@ -593,7 +592,7 @@ void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic, if (m_local) { int unsent = m_wire.WriteText([&](auto& os) { - WireEncodePropertiesUpdate(os, topic->name, update, ack); + net::WireEncodePropertiesUpdate(os, topic->name, update, ack); }); if (unsent < 0) { return; // error @@ -603,7 +602,7 @@ void ServerImpl::ClientData4::SendPropertiesUpdate(TopicData* topic, } } m_outgoing.SendMessage(topic->id, - PropertiesUpdateMsg{topic->name, update, ack}); + net::PropertiesUpdateMsg{topic->name, update, ack}); m_server.m_controlReady = true; } @@ -618,8 +617,8 @@ void ServerImpl::ClientData4::SendOutgoing(uint64_t curTimeMs, bool flush) { void ServerImpl::ClientData4::UpdatePeriod(TopicData::TopicClientData& tcd, TopicData* topic) { - uint32_t period = - CalculatePeriod(tcd.subscribers, [](auto& x) { return x->periodMs; }); + uint32_t period = net::CalculatePeriod(tcd.subscribers, + [](auto& x) { return x->periodMs; }); DEBUG4("updating {} period to {} ms", topic->name, period); m_outgoing.SetPeriod(topic->id, period); } @@ -640,21 +639,21 @@ bool ServerImpl::ClientData3::ProcessIncomingBinary( } void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value, - ValueSendMode mode) { + net::ValueSendMode mode) { if (m_state != kStateRunning) { - if (mode == ValueSendMode::kImm) { - mode = ValueSendMode::kAll; + if (mode == net::ValueSendMode::kImm) { + mode = net::ValueSendMode::kAll; } } else if (m_local) { - mode = ValueSendMode::kImm; // always send local immediately + mode = net::ValueSendMode::kImm; // always send local immediately } TopicData3* topic3 = GetTopic3(topic); bool added = false; switch (mode) { - case ValueSendMode::kDisabled: // do nothing + case net::ValueSendMode::kDisabled: // do nothing break; - case ValueSendMode::kImm: // send immediately + case net::ValueSendMode::kImm: // send immediately ++topic3->seqNum; if (topic3->sentAssign) { net3::WireEncodeEntryUpdate(m_wire.Send().stream(), topic->id, @@ -669,7 +668,7 @@ void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value, Flush(); } break; - case ValueSendMode::kNormal: { + case net::ValueSendMode::kNormal: { // replace, or append if not present wpi::DenseMap::iterator it; std::tie(it, added) = @@ -686,7 +685,7 @@ void ServerImpl::ClientData3::SendValue(TopicData* topic, const Value& value, } } // fallthrough - case ValueSendMode::kAll: // append to outgoing + case net::ValueSendMode::kAll: // append to outgoing if (!added) { m_outgoingValueMap[topic->id] = m_outgoing.size(); } @@ -889,7 +888,7 @@ void ServerImpl::ClientData3::ClientHello(std::string_view self_id, options.prefixMatch = true; sub = std::make_unique( this, std::span{{prefix}}, 0, options); - m_periodMs = UpdatePeriodCalc(m_periodMs, sub->periodMs); + m_periodMs = net::UpdatePeriodCalc(m_periodMs, sub->periodMs); m_setPeriodic(m_periodMs); { @@ -1212,7 +1211,7 @@ ServerImpl::ServerImpl(wpi::Logger& logger) : m_logger{logger} { std::pair ServerImpl::AddClient( std::string_view name, std::string_view connInfo, bool local, - WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic) { + net::WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic) { if (name.empty()) { name = "NT4"; } @@ -1842,7 +1841,7 @@ void ServerImpl::SetValue(ClientData* client, TopicData* topic, for (auto&& tcd : topic->clients) { if (tcd.first != client && - tcd.second.sendMode != ValueSendMode::kDisabled) { + tcd.second.sendMode != net::ValueSendMode::kDisabled) { tcd.first->SendValue(topic, value, tcd.second.sendMode); } } @@ -1942,8 +1941,8 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) { } } -void ServerImpl::SetLocal(ServerMessageHandler* local, - ClientMessageQueue* queue) { +void ServerImpl::SetLocal(net::ServerMessageHandler* local, + net::ClientMessageQueue* queue) { DEBUG4("SetLocal()"); m_local = local; m_localClient->SetQueue(queue); diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.h b/ntcore/src/main/native/cpp/server/ServerImpl.h similarity index 90% rename from ntcore/src/main/native/cpp/net/ServerImpl.h rename to ntcore/src/main/native/cpp/server/ServerImpl.h index f4fae4113c..55c3982617 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.h +++ b/ntcore/src/main/native/cpp/server/ServerImpl.h @@ -21,14 +21,14 @@ #include #include -#include "ClientMessageQueue.h" -#include "Message.h" -#include "NetworkOutgoingQueue.h" -#include "NetworkPing.h" #include "PubSubOptions.h" -#include "WireConnection.h" -#include "WireDecoder.h" -#include "WireEncoder.h" +#include "net/ClientMessageQueue.h" +#include "net/Message.h" +#include "net/NetworkOutgoingQueue.h" +#include "net/NetworkPing.h" +#include "net/WireConnection.h" +#include "net/WireDecoder.h" +#include "net/WireEncoder.h" #include "net3/Message3.h" #include "net3/SequenceNumber.h" #include "net3/WireConnection3.h" @@ -41,15 +41,17 @@ class SmallVectorImpl; class raw_ostream; } // namespace wpi +namespace nt::net { +struct ClientMessage; +class LocalInterface; +class WireConnection; +} // namespace nt::net + namespace nt::net3 { class WireConnection3; } // namespace nt::net3 -namespace nt::net { - -struct ClientMessage; -class LocalInterface; -class WireConnection; +namespace nt::server { class ServerImpl final { public: @@ -62,7 +64,8 @@ class ServerImpl final { void SendAllOutgoing(uint64_t curTimeMs, bool flush); void SendOutgoing(int clientId, uint64_t curTimeMs); - void SetLocal(ServerMessageHandler* local, ClientMessageQueue* queue); + void SetLocal(net::ServerMessageHandler* local, + net::ClientMessageQueue* queue); // these return true if any messages have been queued for later processing bool ProcessIncomingText(int clientId, std::string_view data); @@ -76,7 +79,7 @@ class ServerImpl final { // Caller must ensure WireConnection lifetime lasts until RemoveClient() call. std::pair AddClient(std::string_view name, std::string_view connInfo, bool local, - WireConnection& wire, + net::WireConnection& wire, SetPeriodicFunc setPeriodic); int AddClient3(std::string_view connInfo, bool local, net3::WireConnection3& wire, Connected3Func connected, @@ -149,15 +152,15 @@ class ServerImpl final { struct TopicClientData { wpi::SmallPtrSet publishers; wpi::SmallPtrSet subscribers; - ValueSendMode sendMode = ValueSendMode::kDisabled; + net::ValueSendMode sendMode = net::ValueSendMode::kDisabled; bool AddSubscriber(SubscriberData* sub) { bool added = subscribers.insert(sub).second; if (!sub->options.topicsOnly) { if (sub->options.sendAll) { - sendMode = ValueSendMode::kAll; - } else if (sendMode == ValueSendMode::kDisabled) { - sendMode = ValueSendMode::kNormal; + sendMode = net::ValueSendMode::kAll; + } else if (sendMode == net::ValueSendMode::kDisabled) { + sendMode = net::ValueSendMode::kNormal; } } return added; @@ -189,7 +192,7 @@ class ServerImpl final { virtual bool ProcessIncomingBinary(std::span data) = 0; virtual void SendValue(TopicData* topic, const Value& value, - ValueSendMode mode) = 0; + net::ValueSendMode mode) = 0; virtual void SendAnnounce(TopicData* topic, std::optional pubuid) = 0; virtual void SendUnannounce(TopicData* topic) = 0; virtual void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, @@ -234,7 +237,8 @@ class ServerImpl final { TopicData* m_metaSub = nullptr; }; - class ClientData4Base : public ClientData, protected ClientMessageHandler { + class ClientData4Base : public ClientData, + protected net::ClientMessageHandler { public: ClientData4Base(std::string_view name, std::string_view connInfo, bool local, ServerImpl::SetPeriodicFunc setPeriodic, @@ -255,12 +259,12 @@ class ServerImpl final { void ClientSetValue(int pubuid, const Value& value) final; - bool DoProcessIncomingMessages(ClientMessageQueue& queue, size_t max); + bool DoProcessIncomingMessages(net::ClientMessageQueue& queue, size_t max); wpi::DenseMap m_announceSent; private: - std::array m_msgsBuf; + std::array m_msgsBuf; }; class ClientDataLocal final : public ClientData4Base { @@ -281,7 +285,7 @@ class ServerImpl final { } void SendValue(TopicData* topic, const Value& value, - ValueSendMode mode) final; + net::ValueSendMode mode) final; void SendAnnounce(TopicData* topic, std::optional pubuid) final; void SendUnannounce(TopicData* topic) final; void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, @@ -289,17 +293,18 @@ class ServerImpl final { void SendOutgoing(uint64_t curTimeMs, bool flush) final {} void Flush() final {} - void SetQueue(ClientMessageQueue* queue) { m_queue = queue; } + void SetQueue(net::ClientMessageQueue* queue) { m_queue = queue; } private: - ClientMessageQueue* m_queue = nullptr; + net::ClientMessageQueue* m_queue = nullptr; }; class ClientData4 final : public ClientData4Base { public: ClientData4(std::string_view name, std::string_view connInfo, bool local, - WireConnection& wire, ServerImpl::SetPeriodicFunc setPeriodic, - ServerImpl& server, int id, wpi::Logger& logger) + net::WireConnection& wire, + ServerImpl::SetPeriodicFunc setPeriodic, ServerImpl& server, + int id, wpi::Logger& logger) : ClientData4Base{name, connInfo, local, setPeriodic, server, id, logger}, m_wire{wire}, @@ -319,7 +324,7 @@ class ServerImpl final { } void SendValue(TopicData* topic, const Value& value, - ValueSendMode mode) final; + net::ValueSendMode mode) final; void SendAnnounce(TopicData* topic, std::optional pubuid) final; void SendUnannounce(TopicData* topic) final; void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, @@ -331,12 +336,12 @@ class ServerImpl final { void UpdatePeriod(TopicData::TopicClientData& tcd, TopicData* topic) final; public: - WireConnection& m_wire; + net::WireConnection& m_wire; private: - NetworkPing m_ping; - NetworkIncomingClientQueue m_incoming; - NetworkOutgoingQueue m_outgoing; + net::NetworkPing m_ping; + net::NetworkIncomingClientQueue m_incoming; + net::NetworkOutgoingQueue m_outgoing; }; class ClientData3 final : public ClientData, private net3::MessageHandler3 { @@ -358,7 +363,7 @@ class ServerImpl final { bool ProcessIncomingMessages(size_t max) final { return false; } void SendValue(TopicData* topic, const Value& value, - ValueSendMode mode) final; + net::ValueSendMode mode) final; void SendAnnounce(TopicData* topic, std::optional pubuid) final; void SendUnannounce(TopicData* topic) final; void SendPropertiesUpdate(TopicData* topic, const wpi::json& update, @@ -395,7 +400,7 @@ class ServerImpl final { State m_state{kStateInitial}; net3::WireDecoder3 m_decoder; - NetworkIncomingClientQueue m_incoming; + net::NetworkIncomingClientQueue m_incoming; std::vector m_outgoing; wpi::DenseMap m_outgoingValueMap; int64_t m_nextPubUid{1}; @@ -475,7 +480,7 @@ class ServerImpl final { }; wpi::Logger& m_logger; - ServerMessageHandler* m_local{nullptr}; + net::ServerMessageHandler* m_local{nullptr}; bool m_controlReady{false}; ClientDataLocal* m_localClient; @@ -510,4 +515,4 @@ class ServerImpl final { const wpi::json& update); }; -} // namespace nt::net +} // namespace nt::server diff --git a/ntcore/src/test/native/cpp/net/ServerImplTest.cpp b/ntcore/src/test/native/cpp/server/ServerImplTest.cpp similarity index 98% rename from ntcore/src/test/native/cpp/net/ServerImplTest.cpp rename to ntcore/src/test/native/cpp/server/ServerImplTest.cpp index dce0fb4a38..0db7f15d5b 100644 --- a/ntcore/src/test/native/cpp/net/ServerImplTest.cpp +++ b/ntcore/src/test/native/cpp/server/ServerImplTest.cpp @@ -17,16 +17,16 @@ #include "../PubSubOptionsMatcher.h" #include "../TestPrinters.h" #include "../ValueMatcher.h" +#include "../net/MockClientMessageQueue.h" +#include "../net/MockMessageHandler.h" +#include "../net/MockWireConnection.h" #include "Handle.h" -#include "MockClientMessageQueue.h" -#include "MockMessageHandler.h" -#include "MockWireConnection.h" #include "gmock/gmock.h" #include "net/Message.h" -#include "net/ServerImpl.h" #include "net/WireEncoder.h" #include "ntcore_c.h" #include "ntcore_cpp.h" +#include "server/ServerImpl.h" using ::testing::_; using ::testing::AllOf; @@ -48,7 +48,7 @@ class ServerImplTest : public ::testing::Test { ::testing::StrictMock local; ::testing::StrictMock queue; wpi::MockLogger logger; - net::ServerImpl server{logger}; + server::ServerImpl server{logger}; }; TEST_F(ServerImplTest, AddClient) {