[ntcore] Optimize scan of outgoing messages (#5227)

The algorithm being used for scanning outgoing messages was O(n^2)
because it did a full linear search and then appended. This scan is
performed for each client. If there is a burst of outgoing changes, the
outgoing queue can get quite deep all at once and this scan can be very
slow. Replacing with a map fixes this.
This commit is contained in:
Peter Johnson
2023-03-25 15:20:22 -07:00
committed by GitHub
parent b510c17ef6
commit 0a66479693
2 changed files with 103 additions and 18 deletions

View File

@@ -205,6 +205,7 @@ class ClientData4 final : public ClientData4Base {
private:
std::vector<ServerMessage> m_outgoing;
wpi::DenseMap<NT_Topic, size_t> m_outgoingValueMap;
bool WriteBinary(int64_t id, int64_t time, const Value& value) {
return WireEncodeBinary(SendBinary().Add(), id, time, value);
@@ -282,6 +283,7 @@ class ClientData3 final : public ClientData, private net3::MessageHandler3 {
net3::WireDecoder3 m_decoder;
std::vector<net3::Message3> m_outgoing;
wpi::DenseMap<NT_Topic, size_t> m_outgoingValueMap;
int64_t m_nextPubUid{1};
struct TopicData3 {
@@ -857,24 +859,23 @@ void ClientData4::SendValue(TopicData* topic, const Value& value,
}
break;
case ClientData::kSendAll: // append to outgoing
m_outgoingValueMap[topic->id] = m_outgoing.size();
m_outgoing.emplace_back(ServerMessage{ServerValueMsg{topic->id, value}});
break;
case ClientData::kSendNormal: {
// scan outgoing and replace, or append if not present
bool found = false;
for (auto&& msg : m_outgoing) {
if (auto m = std::get_if<ServerValueMsg>(&msg.contents)) {
if (m->topic == topic->id) {
// replace, or append if not present
auto [it, added] =
m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size());
if (!added && it->second < m_outgoing.size()) {
if (auto m =
std::get_if<ServerValueMsg>(&m_outgoing[it->second].contents)) {
if (m->topic == topic->id) { // should always be true
m->value = value;
found = true;
break;
}
}
}
if (!found) {
m_outgoing.emplace_back(
ServerMessage{ServerValueMsg{topic->id, value}});
}
m_outgoing.emplace_back(ServerMessage{ServerValueMsg{topic->id, value}});
break;
}
}
@@ -959,6 +960,7 @@ void ClientData4::SendOutgoing(uint64_t curTimeMs) {
}
}
m_outgoing.resize(0);
m_outgoingValueMap.clear();
m_lastSendMs = curTimeMs;
}
@@ -991,6 +993,7 @@ void ClientData3::SendValue(TopicData* topic, const Value& value,
mode = ClientData::kSendImmNoFlush; // always send local immediately
}
TopicData3* topic3 = GetTopic3(topic);
bool added = false;
switch (mode) {
case ClientData::kSendDisabled: // do nothing
@@ -1011,24 +1014,26 @@ void ClientData3::SendValue(TopicData* topic, const Value& value,
}
break;
case ClientData::kSendNormal: {
// scan outgoing and replace, or append if not present
bool found = false;
for (auto&& msg : m_outgoing) {
// replace, or append if not present
wpi::DenseMap<NT_Topic, size_t>::iterator it;
std::tie(it, added) =
m_outgoingValueMap.try_emplace(topic->id, m_outgoing.size());
if (!added && it->second < m_outgoing.size()) {
auto& msg = m_outgoing[it->second];
if (msg.Is(net3::Message3::kEntryUpdate) ||
msg.Is(net3::Message3::kEntryAssign)) {
if (msg.id() == topic->id) {
if (msg.id() == topic->id) { // should always be true
msg.SetValue(value);
found = true;
break;
}
}
}
if (found) {
break;
}
}
// fallthrough
case ClientData::kSendAll: // append to outgoing
if (!added) {
m_outgoingValueMap[topic->id] = m_outgoing.size();
}
++topic3->seqNum;
if (topic3->sentAssign) {
m_outgoing.emplace_back(net3::Message3::EntryUpdate(
@@ -1129,6 +1134,7 @@ void ClientData3::SendOutgoing(uint64_t curTimeMs) {
net3::WireEncode(out.stream(), msg);
}
m_outgoing.resize(0);
m_outgoingValueMap.clear();
m_lastSendMs = curTimeMs;
}