diff --git a/ntcore/src/dev/native/cpp/main.cpp b/ntcore/src/dev/native/cpp/main.cpp index 1318b2975e..49cae5be4a 100644 --- a/ntcore/src/dev/native/cpp/main.cpp +++ b/ntcore/src/dev/native/cpp/main.cpp @@ -18,12 +18,16 @@ #include #include +#include "networktables/DoubleArrayTopic.h" +#include "networktables/NetworkTableInstance.h" #include "ntcore.h" +#include "ntcore_c.h" #include "ntcore_cpp.h" void bench(); void bench2(); void stress(); +void stress2(); int main(int argc, char* argv[]) { wpi::impl::SetupNowDefaultOnRio(); @@ -40,6 +44,10 @@ int main(int argc, char* argv[]) { stress(); return EXIT_SUCCESS; } + if (argc == 2 && std::string_view{argv[1]} == "stress2") { + stress2(); + return EXIT_SUCCESS; + } auto myValue = nt::GetEntry(nt::GetDefaultInstance(), "MyValue"); @@ -266,3 +274,54 @@ void stress() { std::this_thread::sleep_for(100s); } + +void stress2() { + using namespace std::chrono_literals; + + auto testTopicName = "testTopic"; + auto count = 1000; + std::atomic_bool isDone{false}; + nt::PubSubOptions pubSubOptions{ + .periodic = std::numeric_limits::min(), + .sendAll = true, + .keepDuplicates = true}; + auto server = nt::NetworkTableInstance::Create(); + server.StartServer(); + auto serverTopic = server.GetDoubleArrayTopic(testTopicName); + auto subscriber = serverTopic.Subscribe({}, pubSubOptions); + std::atomic_int receivedCount{0}; + server.AddListener(subscriber, NT_EVENT_VALUE_REMOTE, [&](auto event) { + if (receivedCount.fetch_add(1) == count) { + isDone = true; + } + // Warnings about duplicate pubs occur if I either introduce this short + // delay... + std::this_thread::sleep_for(1ms); + // ...or a little IO + // System.out.println("Got %d: %s" + // .formatted(receivedCount.get(), Arrays.toString( + // event.valueData.value.getDoubleArray()))); + }); + + auto client = nt::NetworkTableInstance::Create(); + client.SetServer("localhost"); + auto clientName = "test client"; + client.StartClient4(clientName); + std::this_thread::sleep_for(2s); // Startup time. + int sentCount = 0; + while (sentCount < count) { + auto clientTopic = client.GetDoubleArrayTopic(testTopicName); + { + auto publisher = clientTopic.Publish(pubSubOptions); + publisher.Set( + {{static_cast(sentCount), static_cast(sentCount), + static_cast(sentCount)}}); + // client.Flush(); + sentCount++; + } + std::this_thread::yield(); + } + + std::this_thread::sleep_for(10s); + fmt::print("isDone: {}", isDone.load()); +} diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.cpp b/ntcore/src/main/native/cpp/net/ClientImpl.cpp index 96a6be7641..0352ef91e4 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.cpp +++ b/ntcore/src/main/native/cpp/net/ClientImpl.cpp @@ -118,9 +118,7 @@ void ClientImpl::HandleLocal(std::vector&& msgs) { msg->properties, msg->options); m_outgoing.SendMessage(msg->pubHandle, std::move(elem)); } else if (auto msg = std::get_if(&elem.contents)) { - if (Unpublish(msg->pubHandle, msg->topicHandle)) { - m_outgoing.SendMessage(msg->pubHandle, std::move(elem)); - } + Unpublish(msg->pubHandle, msg->topicHandle, std::move(elem)); } else { m_outgoing.SendMessage(0, std::move(elem)); } @@ -199,12 +197,12 @@ void ClientImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle, UpdatePeriodic(); } -bool ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) { +void ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle, + ClientMessage&& msg) { unsigned int index = Handle{pubHandle}.GetIndex(); if (index >= m_publishers.size()) { - return false; + return; } - bool doSend = true; m_publishers[index].reset(); // loop over all publishers to update period @@ -216,10 +214,10 @@ bool ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) { } UpdatePeriodic(); + m_outgoing.SendMessage(pubHandle, std::move(msg)); + // remove from outgoing handle map m_outgoing.EraseHandle(pubHandle); - - return doSend; } void ClientImpl::SetValue(NT_Publisher pubHandle, const Value& value) { diff --git a/ntcore/src/main/native/cpp/net/ClientImpl.h b/ntcore/src/main/native/cpp/net/ClientImpl.h index b72ce38c56..ae6e66636a 100644 --- a/ntcore/src/main/native/cpp/net/ClientImpl.h +++ b/ntcore/src/main/native/cpp/net/ClientImpl.h @@ -75,7 +75,8 @@ class ClientImpl final : private ServerMessageHandler { void Publish(NT_Publisher pubHandle, NT_Topic topicHandle, std::string_view name, std::string_view typeStr, const wpi::json& properties, const PubSubOptionsImpl& options); - bool Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle); + void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle, + ClientMessage&& msg); void SetValue(NT_Publisher pubHandle, const Value& value); int m_inst;