mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-21 01:01:43 +00:00
[ntcore] Various NT4 fixes (#4474)
* TopicListener: Fix Add() return values * Update PubSubOption poll storage documentation * Update NetworkTableEntry::GetValue() doc * Add documentation regarding asynchronous callbacks * Unpublish entry: set publisher to nullptr * Implement ValueListenerPoller default constructor * Remove SetNetworkIdentity, make parameter to StartClient * URI-escape client ID, improve error message * Add connected message with client id; also improve disconnected message a bit * Handle SetServers either before or after StartClient * Fix client use-after-free; also delay reconnect after disconnect to rate limit * Don't re-announce to already subscribed client; we especially don't want to send the last value again * Always accept in-order sets, only use timestamp for tiebreak * Fix LocalStorage::StartNetwork race * Remove unused/unimplemented function Also: * [glass] Remove debug print * [glass] Fix mpack string decoding * [cameraserver] Fix up startclient
This commit is contained in:
@@ -54,7 +54,6 @@ class CImpl : public ServerMessageHandler {
|
||||
|
||||
void ProcessIncomingBinary(std::span<const uint8_t> data);
|
||||
void HandleLocal(std::vector<ClientMessage>&& msgs);
|
||||
void SendOutgoing(std::span<const ClientMessage> msgs);
|
||||
bool SendControl(uint64_t curTimeMs);
|
||||
void SendValues(uint64_t curTimeMs);
|
||||
bool CheckNetworkReady();
|
||||
|
||||
@@ -59,8 +59,8 @@ class NetworkInterface : public NetworkStartupInterface {
|
||||
|
||||
class ILocalStorage : public LocalInterface {
|
||||
public:
|
||||
virtual void StartNetwork(NetworkStartupInterface& startup) = 0;
|
||||
virtual void SetNetwork(NetworkInterface* network) = 0;
|
||||
virtual void StartNetwork(NetworkStartupInterface& startup,
|
||||
NetworkInterface* network) = 0;
|
||||
virtual void ClearNetwork() = 0;
|
||||
};
|
||||
|
||||
|
||||
@@ -325,6 +325,7 @@ struct TopicData {
|
||||
std::string name;
|
||||
unsigned int id;
|
||||
Value lastValue;
|
||||
ClientData* lastValueClient = nullptr;
|
||||
std::string typeStr;
|
||||
wpi::json properties = wpi::json::object();
|
||||
bool persistent{false};
|
||||
@@ -627,6 +628,15 @@ void ClientData4Base::ClientSubscribe(int64_t subuid,
|
||||
removed = topic->subscribers.Remove(sub.get());
|
||||
}
|
||||
|
||||
// is client already subscribed?
|
||||
bool wasSubscribed = false;
|
||||
for (auto subscriber : topic->subscribers) {
|
||||
if (subscriber->client == this) {
|
||||
wasSubscribed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool added = false;
|
||||
if (sub->Matches(topic->name, topic->special)) {
|
||||
topic->subscribers.Add(sub.get());
|
||||
@@ -645,7 +655,7 @@ void ClientData4Base::ClientSubscribe(int64_t subuid,
|
||||
}
|
||||
}
|
||||
|
||||
if (added && !removed) {
|
||||
if (!wasSubscribed && added && !removed) {
|
||||
// announce topic to client
|
||||
DEBUG4("client {}: announce {}", m_id, topic->name);
|
||||
SendAnnounce(topic.get(), std::nullopt);
|
||||
@@ -2086,11 +2096,13 @@ void SImpl::SetFlags(ClientData* client, TopicData* topic, unsigned int flags) {
|
||||
}
|
||||
|
||||
void SImpl::SetValue(ClientData* client, TopicData* topic, const Value& value) {
|
||||
// update retained value if timestamp newer
|
||||
if (!topic->lastValue || value.time() > topic->lastValue.time()) {
|
||||
// update retained value if from same client or timestamp newer
|
||||
if (!topic->lastValue || topic->lastValueClient == client ||
|
||||
value.time() >= topic->lastValue.time()) {
|
||||
DEBUG4("updating '{}' last value (time was {} is {})", topic->name,
|
||||
topic->lastValue.time(), value.time());
|
||||
topic->lastValue = value;
|
||||
topic->lastValueClient = client;
|
||||
|
||||
// if persistent, update flag
|
||||
if (topic->persistent) {
|
||||
|
||||
@@ -44,10 +44,12 @@ void WebSocketConnection::Flush() {
|
||||
}
|
||||
|
||||
++m_sendsActive;
|
||||
m_ws.SendFrames(m_ws_frames, [this](auto bufs, auto) {
|
||||
m_buf_pool.insert(m_buf_pool.end(), bufs.begin(), bufs.end());
|
||||
if (m_sendsActive > 0) {
|
||||
--m_sendsActive;
|
||||
m_ws.SendFrames(m_ws_frames, [selfweak = weak_from_this()](auto bufs, auto) {
|
||||
if (auto self = selfweak.lock()) {
|
||||
self->m_buf_pool.insert(self->m_buf_pool.end(), bufs.begin(), bufs.end());
|
||||
if (self->m_sendsActive > 0) {
|
||||
--self->m_sendsActive;
|
||||
}
|
||||
}
|
||||
});
|
||||
m_frames.clear();
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include <wpi/SmallVector.h>
|
||||
@@ -15,7 +16,9 @@
|
||||
|
||||
namespace nt::net {
|
||||
|
||||
class WebSocketConnection final : public WireConnection {
|
||||
class WebSocketConnection final
|
||||
: public WireConnection,
|
||||
public std::enable_shared_from_this<WebSocketConnection> {
|
||||
public:
|
||||
explicit WebSocketConnection(wpi::WebSocket& ws);
|
||||
~WebSocketConnection() override;
|
||||
|
||||
Reference in New Issue
Block a user