diff --git a/ntcore/src/dev/native/cpp/main.cpp b/ntcore/src/dev/native/cpp/main.cpp index 87fce7fb63..66418cc20a 100644 --- a/ntcore/src/dev/native/cpp/main.cpp +++ b/ntcore/src/dev/native/cpp/main.cpp @@ -3,6 +3,7 @@ // the WPILib BSD license file in the root directory of this project. #include +#include #include #include #include @@ -18,6 +19,7 @@ #include "ntcore_cpp.h" void bench(); +void bench2(); void stress(); int main(int argc, char* argv[]) { @@ -25,6 +27,10 @@ int main(int argc, char* argv[]) { bench(); return EXIT_SUCCESS; } + if (argc == 2 && std::string_view{argv[1]} == "bench2") { + bench2(); + return EXIT_SUCCESS; + } if (argc == 2 && std::string_view{argv[1]} == "stress") { stress(); return EXIT_SUCCESS; @@ -114,6 +120,79 @@ void bench() { PrintTimes(flushTimes); } +void bench2() { + // set up instances + auto client1 = nt::CreateInstance(); + auto client2 = nt::CreateInstance(); + auto server = nt::CreateInstance(); + + // connect client and server + nt::StartServer(server, "bench2.json", "127.0.0.1", 10001, 10000); + nt::StartClient4(client1, "client1"); + nt::StartClient3(client2, "client2"); + nt::SetServer(client1, "127.0.0.1", 10000); + nt::SetServer(client2, "127.0.0.1", 10001); + + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + + // add "typical" set of subscribers on client and server + nt::SubscribeMultiple(client1, {{std::string_view{}}}); + nt::SubscribeMultiple(client2, {{std::string_view{}}}); + nt::SubscribeMultiple(server, {{std::string_view{}}}); + + // create 1000 entries + std::array pubs; + for (int i = 0; i < 1000; ++i) { + pubs[i] = nt::GetEntry( + nt::GetTopic(server, + fmt::format("/some/long/name/with/lots/of/slashes/{}", i)), + NT_DOUBLE_ARRAY, "double[]"); + } + + // warm up + for (int i = 1; i <= 100; ++i) { + for (auto pub : pubs) { + double vals[3] = {i * 0.01, i * 0.02, i * 0.03}; + nt::SetDoubleArray(pub, vals); + } + nt::FlushLocal(server); + std::this_thread::sleep_for(0.02s); + } + + std::vector flushTimes; + flushTimes.reserve(1001); + + std::vector times; + times.reserve(1001); + + // benchmark + auto start = std::chrono::high_resolution_clock::now(); + int64_t now = nt::Now(); + for (int i = 1; i <= 1000; ++i) { + for (auto pub : pubs) { + double vals[3] = {i * 0.01, i * 0.02, i * 0.03}; + nt::SetDoubleArray(pub, vals); + } + int64_t prev = now; + now = nt::Now(); + times.emplace_back(now - prev); + nt::FlushLocal(server); + nt::Flush(server); + flushTimes.emplace_back(nt::Now() - now); + std::this_thread::sleep_for(0.02s); + now = nt::Now(); + } + auto stop = std::chrono::high_resolution_clock::now(); + + fmt::print("total time: {}us\n", + std::chrono::duration_cast(stop - start) + .count()); + PrintTimes(times); + fmt::print("-- Flush --\n"); + PrintTimes(flushTimes); +} + static std::random_device r; static std::mt19937 gen(r()); static std::uniform_real_distribution dist; diff --git a/ntcore/src/main/native/cpp/net/ServerImpl.cpp b/ntcore/src/main/native/cpp/net/ServerImpl.cpp index 81749083eb..3bc9165171 100644 --- a/ntcore/src/main/native/cpp/net/ServerImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ServerImpl.cpp @@ -205,6 +205,7 @@ class ClientData4 final : public ClientData4Base { private: std::vector m_outgoing; + wpi::DenseMap m_outgoingValueMap; bool WriteBinary(int64_t id, int64_t time, const Value& value) { return WireEncodeBinary(SendBinary().Add(), id, time, value); @@ -282,6 +283,7 @@ class ClientData3 final : public ClientData, private net3::MessageHandler3 { net3::WireDecoder3 m_decoder; std::vector m_outgoing; + wpi::DenseMap m_outgoingValueMap; int64_t m_nextPubUid{1}; struct TopicData3 { @@ -857,24 +859,23 @@ void ClientData4::SendValue(TopicData* topic, const Value& value, } break; case ClientData::kSendAll: // append to outgoing + m_outgoingValueMap[topic->id] = m_outgoing.size(); m_outgoing.emplace_back(ServerMessage{ServerValueMsg{topic->id, value}}); break; case ClientData::kSendNormal: { - // scan outgoing and replace, or append if not present - bool found = false; - for (auto&& msg : m_outgoing) { - if (auto m = std::get_if(&msg.contents)) { - if (m->topic == topic->id) { + // replace, or append if not present + auto [it, added] = + m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size()); + if (!added && it->second < m_outgoing.size()) { + if (auto m = + std::get_if(&m_outgoing[it->second].contents)) { + if (m->topic == topic->id) { // should always be true m->value = value; - found = true; break; } } } - if (!found) { - m_outgoing.emplace_back( - ServerMessage{ServerValueMsg{topic->id, value}}); - } + m_outgoing.emplace_back(ServerMessage{ServerValueMsg{topic->id, value}}); break; } } @@ -959,6 +960,7 @@ void ClientData4::SendOutgoing(uint64_t curTimeMs) { } } m_outgoing.resize(0); + m_outgoingValueMap.clear(); m_lastSendMs = curTimeMs; } @@ -991,6 +993,7 @@ void ClientData3::SendValue(TopicData* topic, const Value& value, mode = ClientData::kSendImmNoFlush; // always send local immediately } TopicData3* topic3 = GetTopic3(topic); + bool added = false; switch (mode) { case ClientData::kSendDisabled: // do nothing @@ -1011,24 +1014,26 @@ void ClientData3::SendValue(TopicData* topic, const Value& value, } break; case ClientData::kSendNormal: { - // scan outgoing and replace, or append if not present - bool found = false; - for (auto&& msg : m_outgoing) { + // replace, or append if not present + wpi::DenseMap::iterator it; + std::tie(it, added) = + m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size()); + if (!added && it->second < m_outgoing.size()) { + auto& msg = m_outgoing[it->second]; if (msg.Is(net3::Message3::kEntryUpdate) || msg.Is(net3::Message3::kEntryAssign)) { - if (msg.id() == topic->id) { + if (msg.id() == topic->id) { // should always be true msg.SetValue(value); - found = true; break; } } } - if (found) { - break; - } } // fallthrough case ClientData::kSendAll: // append to outgoing + if (!added) { + m_outgoingValueMap[topic->id] = m_outgoing.size(); + } ++topic3->seqNum; if (topic3->sentAssign) { m_outgoing.emplace_back(net3::Message3::EntryUpdate( @@ -1129,6 +1134,7 @@ void ClientData3::SendOutgoing(uint64_t curTimeMs) { net3::WireEncode(out.stream(), msg); } m_outgoing.resize(0); + m_outgoingValueMap.clear(); m_lastSendMs = curTimeMs; }