[ntcore] Simplify local startup (#4803)

The current approach was slightly more efficient but didn't correctly
handle meta-topic publishing for topics published before the server was
started.
This commit is contained in:
Peter Johnson
2022-12-13 22:31:44 -08:00
committed by GitHub
parent 7ba8a9ee1f
commit fb2c170b6e
13 changed files with 66 additions and 221 deletions

View File

@@ -427,7 +427,7 @@ void SubscriberData::UpdateActive() {
}
void LSImpl::NotifyTopic(TopicData* topic, unsigned int eventFlags) {
DEBUG4("NotifyTopic({}, {})\n", topic->name, eventFlags);
DEBUG4("NotifyTopic({}, {})", topic->name, eventFlags);
auto topicInfo = topic->GetTopicInfo();
if (!topic->listeners.empty()) {
m_listenerStorage.Notify(topic->listeners, eventFlags, topicInfo);
@@ -1380,36 +1380,37 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
}
}
void LocalStorage::StartNetwork(net::NetworkStartupInterface& startup,
net::NetworkInterface* network) {
void LocalStorage::StartNetwork(net::NetworkInterface* network) {
WPI_DEBUG4(m_impl->m_logger, "StartNetwork()");
std::scoped_lock lock{m_mutex};
m_impl->m_network = network;
// publish all active publishers to the network and send last values
// only send value once per topic
for (auto&& topic : m_impl->m_topics) {
PublisherData* anyPublisher = nullptr;
for (auto&& publisher : topic->localPublishers) {
if (publisher->active) {
startup.Publish(publisher->handle, topic->handle, topic->name,
topic->typeStr, topic->properties, publisher->config);
network->Publish(publisher->handle, topic->handle, topic->name,
topic->typeStr, topic->properties, publisher->config);
anyPublisher = publisher;
}
}
if (anyPublisher && topic->lastValue) {
startup.SetValue(anyPublisher->handle, topic->lastValue);
network->SetValue(anyPublisher->handle, topic->lastValue);
}
}
for (auto&& subscriber : m_impl->m_subscribers) {
startup.Subscribe(subscriber->handle, {{subscriber->topic->name}},
subscriber->config);
network->Subscribe(subscriber->handle, {{subscriber->topic->name}},
subscriber->config);
}
for (auto&& subscriber : m_impl->m_multiSubscribers) {
startup.Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
network->Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
}
m_impl->m_network = network;
}
void LocalStorage::ClearNetwork() {
WPI_DEBUG4(m_impl->m_logger, "ClearNetwork()");
std::scoped_lock lock{m_mutex};
m_impl->m_network = nullptr;
// treat as an unannounce all from the network side

View File

@@ -44,8 +44,7 @@ class LocalStorage final : public net::ILocalStorage {
bool ack) final;
void NetworkSetValue(NT_Topic topicHandle, const Value& value) final;
void StartNetwork(net::NetworkStartupInterface& startup,
net::NetworkInterface* network) final;
void StartNetwork(net::NetworkInterface* network) final;
void ClearNetwork() final;
// User functions. These are the actual implementations of the corresponding

View File

@@ -304,11 +304,9 @@ void NCImpl3::TcpConnected(uv::Tcp& tcp) {
}
});
{
net3::ClientStartup3 startup{*m_clientImpl};
m_localStorage.StartNetwork(startup, &m_localQueue);
}
m_clientImpl->SetLocal(&m_localStorage);
m_localStorage.StartNetwork(&m_localQueue);
HandleLocal();
});
tcp.SetData(clientImpl);
@@ -429,11 +427,10 @@ void NCImpl4::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) {
m_sendValuesTimer->Start(uv::Timer::Time{repeatMs},
uv::Timer::Time{repeatMs});
});
{
net::ClientStartup startup{*m_clientImpl};
m_localStorage.StartNetwork(startup, &m_localQueue);
}
m_clientImpl->SetLocal(&m_localStorage);
m_localStorage.StartNetwork(&m_localQueue);
HandleLocal();
m_clientImpl->SendInitial();
ws.closed.connect([this, &ws](uint16_t, std::string_view reason) {
if (!ws.GetStream().IsLoopClosing()) {
Disconnect(reason);

View File

@@ -334,11 +334,9 @@ NSImpl::NSImpl(std::string_view persistentFilename,
m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize);
m_loopRunner.ExecAsync([=, this](uv::Loop& loop) {
// connect local storage to server
{
net::ServerStartup startup{m_serverImpl};
m_localStorage.StartNetwork(startup, &m_localQueue);
}
m_serverImpl.SetLocal(&m_localStorage);
m_localStorage.StartNetwork(&m_localQueue);
HandleLocal();
// load persistent file first, then initialize
uv::QueueWork(

View File

@@ -55,6 +55,7 @@ class CImpl : public ServerMessageHandler {
void HandleLocal(std::vector<ClientMessage>&& msgs);
bool SendControl(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs);
void SendInitialValues();
bool CheckNetworkReady();
// ServerMessageHandler interface
@@ -229,7 +230,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) {
}
void CImpl::SendValues(uint64_t curTimeMs) {
DEBUG4("SendPeriodic({})", curTimeMs);
DEBUG4("SendValues({})", curTimeMs);
// can't send value updates until we have a RTT
if (!m_haveTimeOffset) {
@@ -268,6 +269,36 @@ void CImpl::SendValues(uint64_t curTimeMs) {
}
}
void CImpl::SendInitialValues() {
DEBUG4("SendInitialValues()");
// ensure all control messages are sent ahead of value updates
if (!SendControl(0)) {
return;
}
// only send time=0 values (as we don't have a RTT yet)
auto writer = m_wire.SendBinary();
for (auto&& pub : m_publishers) {
if (pub && !pub->outValues.empty()) {
bool sent = false;
for (auto&& val : pub->outValues) {
if (val.server_time() == 0) {
DEBUG4("Sending {} value time={} server_time={}", pub->handle,
val.time(), val.server_time());
WireEncodeBinary(writer.Add(), Handle{pub->handle}.GetIndex(), 0,
val);
sent = true;
}
}
if (sent) {
std::erase_if(pub->outValues,
[](const auto& v) { return v.server_time() == 0; });
}
}
}
}
bool CImpl::CheckNetworkReady() {
if (!m_wire.Ready()) {
++m_notReadyCount;
@@ -434,46 +465,7 @@ void ClientImpl::SetLocal(LocalInterface* local) {
m_impl->m_local = local;
}
ClientStartup::ClientStartup(ClientImpl& client)
: m_client{client},
m_binaryWriter{client.m_impl->m_wire.SendBinary()},
m_textWriter{client.m_impl->m_wire.SendText()} {}
ClientStartup::~ClientStartup() {
m_client.m_impl->m_wire.Flush();
}
void ClientStartup::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupPublish({}, {}, {}, {})",
pubHandle, topicHandle, name, typeStr);
m_client.m_impl->Publish(pubHandle, topicHandle, name, typeStr, properties,
options);
WireEncodePublish(m_textWriter.Add(), Handle{pubHandle}.GetIndex(), name,
typeStr, properties);
}
void ClientStartup::Subscribe(NT_Subscriber subHandle,
std::span<const std::string> prefixes,
const PubSubOptionsImpl& options) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSubscribe({})", subHandle);
WireEncodeSubscribe(m_textWriter.Add(), Handle{subHandle}.GetIndex(),
prefixes, options);
}
void ClientStartup::SetValue(NT_Publisher pubHandle, const Value& value) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSetValue({})", pubHandle);
// Similar to Client::SetValue(), except always set lastValue and send
unsigned int index = Handle{pubHandle}.GetIndex();
assert(index < m_client.m_impl->m_publishers.size() &&
m_client.m_impl->m_publishers[index]);
auto& publisher = *m_client.m_impl->m_publishers[index];
// only send time 0 values until we have a RTT
if (value.server_time() == 0) {
WireEncodeBinary(m_binaryWriter.Add(), index, 0, value);
} else {
publisher.outValues.emplace_back(value);
}
void ClientImpl::SendInitial() {
m_impl->SendInitialValues();
m_impl->m_wire.Flush();
}

View File

@@ -28,12 +28,9 @@ class Value;
namespace nt::net {
struct ClientMessage;
class ClientStartup;
class WireConnection;
class ClientImpl {
friend class ClientStartup;
public:
ClientImpl(uint64_t curTimeMs, int inst, WireConnection& wire,
wpi::Logger& logger,
@@ -48,32 +45,11 @@ class ClientImpl {
void SendValues(uint64_t curTimeMs);
void SetLocal(LocalInterface* local);
void SendInitial();
private:
class Impl;
std::unique_ptr<Impl> m_impl;
};
class ClientStartup final : public NetworkStartupInterface {
public:
explicit ClientStartup(ClientImpl& client);
~ClientStartup() final;
ClientStartup(const ClientStartup&) = delete;
ClientStartup& operator=(const ClientStartup&) = delete;
// NetworkStartupInterface interface
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void Subscribe(NT_Subscriber subHandle, std::span<const std::string> prefixes,
const PubSubOptionsImpl& options) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;
private:
ClientImpl& m_client;
BinaryWriter m_binaryWriter;
TextWriter m_textWriter;
};
} // namespace nt::net

View File

@@ -35,32 +35,27 @@ class LocalInterface {
virtual void NetworkSetValue(NT_Topic topicHandle, const Value& value) = 0;
};
class NetworkStartupInterface {
class NetworkInterface {
public:
virtual ~NetworkStartupInterface() = default;
virtual ~NetworkInterface() = default;
virtual void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) = 0;
virtual void Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void SetValue(NT_Publisher pubHandle, const Value& value) = 0;
};
class NetworkInterface : public NetworkStartupInterface {
public:
virtual void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) = 0;
virtual void SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update) = 0;
virtual void Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void Unsubscribe(NT_Subscriber subHandle) = 0;
virtual void SetValue(NT_Publisher pubHandle, const Value& value) = 0;
};
class ILocalStorage : public LocalInterface {
public:
virtual void StartNetwork(NetworkStartupInterface& startup,
NetworkInterface* network) = 0;
virtual void StartNetwork(NetworkInterface* network) = 0;
virtual void ClearNetwork() = 0;
};

View File

@@ -168,8 +168,6 @@ class ClientData4Base : public ClientData, protected ClientMessageHandler {
};
class ClientDataLocal final : public ClientData4Base {
friend class net::ServerStartup;
public:
ClientDataLocal(SImpl& server, int id, wpi::Logger& logger)
: ClientData4Base{"", "", "", true, [](uint32_t) {}, server, id, logger} {
@@ -2300,6 +2298,7 @@ void ServerImpl::HandleLocal(std::span<const ClientMessage> msgs) {
}
void ServerImpl::SetLocal(LocalInterface* local) {
WPI_DEBUG4(m_impl->m_logger, "SetLocal()");
m_impl->m_local = local;
// create server meta topics
@@ -2362,22 +2361,3 @@ std::string ServerImpl::DumpPersistent() {
std::string ServerImpl::LoadPersistent(std::string_view in) {
return m_impl->LoadPersistent(in);
}
void ServerStartup::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
m_server.m_impl->m_localClient->ClientPublish(pubHandle, name, typeStr,
properties);
}
void ServerStartup::Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
m_server.m_impl->m_localClient->ClientSubscribe(subHandle, topicNames,
options);
}
void ServerStartup::SetValue(NT_Publisher pubHandle, const Value& value) {
m_server.m_impl->m_localClient->ClientSetValue(pubHandle, value);
}

View File

@@ -29,12 +29,9 @@ namespace nt::net {
struct ClientMessage;
class LocalInterface;
class ServerStartup;
class WireConnection;
class ServerImpl final {
friend class ServerStartup;
public:
using SetPeriodicFunc = std::function<void(uint32_t repeatMs)>;
using Connected3Func =
@@ -76,22 +73,4 @@ class ServerImpl final {
std::unique_ptr<Impl> m_impl;
};
class ServerStartup final : public NetworkStartupInterface {
public:
explicit ServerStartup(ServerImpl& server) : m_server{server} {}
// NetworkStartupInterface interface
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;
private:
ServerImpl& m_server;
};
} // namespace nt::net

View File

@@ -640,34 +640,3 @@ void ClientImpl3::SendPeriodic(uint64_t curTimeMs) {
void ClientImpl3::SetLocal(net::LocalInterface* local) {
m_impl->m_local = local;
}
ClientStartup3::ClientStartup3(ClientImpl3& client) : m_client{client} {}
ClientStartup3::~ClientStartup3() = default;
void ClientStartup3::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupPublish({}, {}, {}, {})",
pubHandle, topicHandle, name, typeStr);
m_client.m_impl->Publish(pubHandle, topicHandle, name, typeStr, properties,
options);
}
void ClientStartup3::Subscribe(NT_Subscriber subHandle,
std::span<const std::string> prefixes,
const PubSubOptionsImpl& options) {
// NT3 ignores subscribes, so no action required
}
void ClientStartup3::SetValue(NT_Publisher pubHandle, const Value& value) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSetValue({})", pubHandle);
// Similar to Client::SetValue(), except always set value and queue
unsigned int index = Handle{pubHandle}.GetIndex();
assert(index < m_client.m_impl->m_publishers.size() &&
m_client.m_impl->m_publishers[index]);
auto& publisher = *m_client.m_impl->m_publishers[index];
publisher.entry->value = value;
publisher.outValues.emplace_back(value);
}

View File

@@ -25,12 +25,9 @@ class LocalInterface;
namespace nt::net3 {
class ClientStartup3;
class WireConnection3;
class ClientImpl3 {
friend class ClientStartup3;
public:
explicit ClientImpl3(uint64_t curTimeMs, int inst, WireConnection3& wire,
wpi::Logger& logger,
@@ -50,24 +47,4 @@ class ClientImpl3 {
std::unique_ptr<Impl> m_impl;
};
class ClientStartup3 final : public net::NetworkStartupInterface {
public:
explicit ClientStartup3(ClientImpl3& client);
~ClientStartup3() final;
ClientStartup3(const ClientStartup3&) = delete;
ClientStartup3& operator=(const ClientStartup3&) = delete;
// NetworkStartupInterface interface
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void Subscribe(NT_Subscriber subHandle, std::span<const std::string> prefixes,
const PubSubOptionsImpl& options) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;
private:
ClientImpl3& m_client;
};
} // namespace nt::net3