[ntcore] Fix client unpublish outgoing queue (#7051)

The unpublish message must be sent on the outgoing queue before the handle
indicating which queue to use is erased.
This commit is contained in:
Peter Johnson
2024-09-08 10:35:41 -07:00
committed by GitHub
parent 7bdecab729
commit 306e190f78
3 changed files with 67 additions and 9 deletions

View File

@@ -18,12 +18,16 @@
#include <wpi/print.h>
#include <wpi/timestamp.h>
#include "networktables/DoubleArrayTopic.h"
#include "networktables/NetworkTableInstance.h"
#include "ntcore.h"
#include "ntcore_c.h"
#include "ntcore_cpp.h"
void bench();
void bench2();
void stress();
void stress2();
int main(int argc, char* argv[]) {
wpi::impl::SetupNowDefaultOnRio();
@@ -40,6 +44,10 @@ int main(int argc, char* argv[]) {
stress();
return EXIT_SUCCESS;
}
if (argc == 2 && std::string_view{argv[1]} == "stress2") {
stress2();
return EXIT_SUCCESS;
}
auto myValue = nt::GetEntry(nt::GetDefaultInstance(), "MyValue");
@@ -266,3 +274,54 @@ void stress() {
std::this_thread::sleep_for(100s);
}
void stress2() {
using namespace std::chrono_literals;
auto testTopicName = "testTopic";
auto count = 1000;
std::atomic_bool isDone{false};
nt::PubSubOptions pubSubOptions{
.periodic = std::numeric_limits<double>::min(),
.sendAll = true,
.keepDuplicates = true};
auto server = nt::NetworkTableInstance::Create();
server.StartServer();
auto serverTopic = server.GetDoubleArrayTopic(testTopicName);
auto subscriber = serverTopic.Subscribe({}, pubSubOptions);
std::atomic_int receivedCount{0};
server.AddListener(subscriber, NT_EVENT_VALUE_REMOTE, [&](auto event) {
if (receivedCount.fetch_add(1) == count) {
isDone = true;
}
// Warnings about duplicate pubs occur if I either introduce this short
// delay...
std::this_thread::sleep_for(1ms);
// ...or a little IO
// System.out.println("Got %d: %s"
// .formatted(receivedCount.get(), Arrays.toString(
// event.valueData.value.getDoubleArray())));
});
auto client = nt::NetworkTableInstance::Create();
client.SetServer("localhost");
auto clientName = "test client";
client.StartClient4(clientName);
std::this_thread::sleep_for(2s); // Startup time.
int sentCount = 0;
while (sentCount < count) {
auto clientTopic = client.GetDoubleArrayTopic(testTopicName);
{
auto publisher = clientTopic.Publish(pubSubOptions);
publisher.Set(
{{static_cast<double>(sentCount), static_cast<double>(sentCount),
static_cast<double>(sentCount)}});
// client.Flush();
sentCount++;
}
std::this_thread::yield();
}
std::this_thread::sleep_for(10s);
fmt::print("isDone: {}", isDone.load());
}

View File

@@ -118,9 +118,7 @@ void ClientImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
msg->properties, msg->options);
m_outgoing.SendMessage(msg->pubHandle, std::move(elem));
} else if (auto msg = std::get_if<UnpublishMsg>(&elem.contents)) {
if (Unpublish(msg->pubHandle, msg->topicHandle)) {
m_outgoing.SendMessage(msg->pubHandle, std::move(elem));
}
Unpublish(msg->pubHandle, msg->topicHandle, std::move(elem));
} else {
m_outgoing.SendMessage(0, std::move(elem));
}
@@ -199,12 +197,12 @@ void ClientImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
UpdatePeriodic();
}
bool ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
void ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle,
ClientMessage&& msg) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
return false;
return;
}
bool doSend = true;
m_publishers[index].reset();
// loop over all publishers to update period
@@ -216,10 +214,10 @@ bool ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) {
}
UpdatePeriodic();
m_outgoing.SendMessage(pubHandle, std::move(msg));
// remove from outgoing handle map
m_outgoing.EraseHandle(pubHandle);
return doSend;
}
void ClientImpl::SetValue(NT_Publisher pubHandle, const Value& value) {

View File

@@ -75,7 +75,8 @@ class ClientImpl final : private ServerMessageHandler {
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
bool Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle);
void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle,
ClientMessage&& msg);
void SetValue(NT_Publisher pubHandle, const Value& value);
int m_inst;