[ntcore] Add cached topic property (#5494)

This commit is contained in:
Joseph Eng
2023-12-10 23:23:36 -08:00
committed by GitHub
parent 192a28af47
commit 8723ee5c39
16 changed files with 316 additions and 34 deletions

View File

@@ -175,22 +175,26 @@ void LocalStorage::Impl::CheckReset(TopicData* topic) {
}
bool LocalStorage::Impl::SetValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
unsigned int eventFlags,
bool suppressIfDuplicate,
const PublisherData* publisher) {
const bool isDuplicate = topic->IsCached() && topic->lastValue == value;
DEBUG4("SetValue({}, {}, {}, {})", topic->name, value.time(), eventFlags,
isDuplicate);
if (topic->type != NT_UNASSIGNED && topic->type != value.type()) {
return false;
}
// Make sure value isn't older than last value
if (!topic->lastValue || topic->lastValue.time() == 0 ||
value.time() >= topic->lastValue.time()) {
// TODO: notify option even if older value
if (!(suppressIfDuplicate && isDuplicate)) {
topic->type = value.type();
topic->lastValue = value;
topic->lastValueFromNetwork = false;
NotifyValue(topic, eventFlags, isDuplicate, publisher);
if (topic->IsCached()) {
topic->lastValue = value;
topic->lastValueFromNetwork = false;
}
NotifyValue(topic, value, eventFlags, isDuplicate, publisher);
if (topic->datalogType == value.type()) {
for (auto&& datalog : topic->datalogs) {
datalog.Append(value);
@@ -202,8 +206,8 @@ bool LocalStorage::Impl::SetValue(TopicData* topic, const Value& value,
return true;
}
void LocalStorage::Impl::NotifyValue(TopicData* topic, unsigned int eventFlags,
bool isDuplicate,
void LocalStorage::Impl::NotifyValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
const PublisherData* publisher) {
bool isNetwork = (eventFlags & NT_EVENT_VALUE_REMOTE) != 0;
for (auto&& subscriber : topic->localSubscribers) {
@@ -213,11 +217,11 @@ void LocalStorage::Impl::NotifyValue(TopicData* topic, unsigned int eventFlags,
(!isNetwork && !subscriber->config.disableLocal)) &&
(!publisher || (publisher && (subscriber->config.excludePublisher !=
publisher->handle)))) {
subscriber->pollStorage.emplace_back(topic->lastValue);
subscriber->pollStorage.emplace_back(value);
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
topic->handle, 0, value);
}
}
}
@@ -227,7 +231,7 @@ void LocalStorage::Impl::NotifyValue(TopicData* topic, unsigned int eventFlags,
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
topic->handle, 0, value);
}
}
}
@@ -249,6 +253,22 @@ void LocalStorage::Impl::SetFlags(TopicData* topic, unsigned int flags) {
topic->properties.erase("retained");
update["retained"] = wpi::json();
}
if ((flags & NT_UNCACHED) != 0) {
topic->properties["cached"] = false;
update["cached"] = false;
} else {
topic->properties.erase("cached");
update["cached"] = wpi::json();
}
if ((flags & NT_UNCACHED) != 0) {
topic->lastValue = {};
topic->lastValueNetwork = {};
topic->lastValueFromNetwork = false;
}
if ((flags & NT_UNCACHED) != 0 && (flags & NT_PERSISTENT) != 0) {
WARN("topic {}: disabling cached property disables persistent storage",
topic->name);
}
topic->flags = flags;
if (!update.empty()) {
PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false);
@@ -283,6 +303,20 @@ void LocalStorage::Impl::SetRetained(TopicData* topic, bool value) {
PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false);
}
void LocalStorage::Impl::SetCached(TopicData* topic, bool value) {
wpi::json update = wpi::json::object();
if (value) {
topic->flags &= ~NT_UNCACHED;
topic->properties.erase("cached");
update["cached"] = wpi::json();
} else {
topic->flags |= NT_UNCACHED;
topic->properties["cached"] = false;
update["cached"] = false;
}
PropertiesUpdated(topic, update, NT_EVENT_NONE, true, false);
}
void LocalStorage::Impl::SetProperties(TopicData* topic,
const wpi::json& update,
bool sendNetwork) {
@@ -328,6 +362,28 @@ void LocalStorage::Impl::PropertiesUpdated(TopicData* topic,
}
}
}
it = topic->properties.find("cached");
if (it != topic->properties.end()) {
if (auto val = it->get_ptr<bool*>()) {
if (*val) {
topic->flags &= ~NT_UNCACHED;
} else {
topic->flags |= NT_UNCACHED;
}
}
}
if ((topic->flags & NT_UNCACHED) != 0) {
topic->lastValue = {};
topic->lastValueNetwork = {};
topic->lastValueFromNetwork = false;
}
if ((topic->flags & NT_UNCACHED) != 0 &&
(topic->flags & NT_PERSISTENT) != 0) {
WARN("topic {}: disabling cached property disables persistent storage",
topic->name);
}
}
topic->propertiesStr = topic->properties.dump();
@@ -895,20 +951,22 @@ bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher,
return false;
}
if (publisher->active) {
bool isDuplicate, isNetworkDuplicate, suppressDuplicates;
bool isNetworkDuplicate, suppressDuplicates;
if (force || publisher->config.keepDuplicates) {
suppressDuplicates = false;
isNetworkDuplicate = false;
} else {
suppressDuplicates = true;
isNetworkDuplicate = (publisher->topic->lastValueNetwork == value);
isNetworkDuplicate = publisher->topic->IsCached() &&
(publisher->topic->lastValueNetwork == value);
}
isDuplicate = (publisher->topic->lastValue == value);
if (!isNetworkDuplicate && m_network) {
publisher->topic->lastValueNetwork = value;
if (publisher->topic->IsCached()) {
publisher->topic->lastValueNetwork = value;
}
m_network->SetValue(publisher->handle, value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate,
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL,
suppressDuplicates, publisher);
} else {
return false;
@@ -940,6 +998,10 @@ bool LocalStorage::Impl::SetDefaultEntryValue(NT_Handle pubsubentryHandle,
return false;
}
if (auto topic = GetTopic(pubsubentryHandle)) {
if (!topic->IsCached()) {
WARN("ignoring default value on non-cached topic '{}'", topic->name);
return false;
}
if (!topic->lastValue &&
(topic->type == NT_UNASSIGNED || topic->type == value.type() ||
IsNumericCompatible(topic->type, value.type()))) {
@@ -1026,10 +1088,11 @@ void LocalStorage::NetworkPropertiesUpdate(std::string_view name,
void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE,
value == topic->lastValue, false, nullptr)) {
topic->lastValueNetwork = value;
topic->lastValueFromNetwork = true;
if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) {
if (topic->IsCached()) {
topic->lastValueNetwork = value;
topic->lastValueFromNetwork = true;
}
}
}
}

View File

@@ -136,6 +136,22 @@ class LocalStorage final : public net::ILocalStorage {
}
}
void SetTopicCached(NT_Topic topicHandle, bool value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
m_impl.SetCached(topic, value);
}
}
bool GetTopicCached(NT_Topic topicHandle) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
return (topic->flags & NT_UNCACHED) == 0;
} else {
return false;
}
}
bool GetTopicExists(NT_Handle handle) {
std::scoped_lock lock{m_mutex};
TopicData* topic = m_impl.GetTopic(handle);
@@ -361,6 +377,8 @@ class LocalStorage final : public net::ILocalStorage {
bool Exists() const { return onNetwork || !localPublishers.empty(); }
bool IsCached() const { return (flags & NT_UNCACHED) == 0; }
TopicInfo GetTopicInfo() const;
// invariants
@@ -565,14 +583,15 @@ class LocalStorage final : public net::ILocalStorage {
void CheckReset(TopicData* topic);
bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags,
bool isDuplicate, bool suppressIfDuplicate,
const PublisherData* publisher);
void NotifyValue(TopicData* topic, unsigned int eventFlags,
bool isDuplicate, const PublisherData* publisher);
bool suppressIfDuplicate, const PublisherData* publisher);
void NotifyValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
const PublisherData* publisher);
void SetFlags(TopicData* topic, unsigned int flags);
void SetPersistent(TopicData* topic, bool value);
void SetRetained(TopicData* topic, bool value);
void SetCached(TopicData* topic, bool value);
void SetProperties(TopicData* topic, const wpi::json& update,
bool sendNetwork);
void PropertiesUpdated(TopicData* topic, const wpi::json& update,

View File

@@ -671,6 +671,30 @@ Java_edu_wpi_first_networktables_NetworkTablesJNI_getTopicRetained
return nt::GetTopicRetained(topic);
}
/*
* Class: edu_wpi_first_networktables_NetworkTablesJNI
* Method: setTopicCached
* Signature: (IZ)V
*/
JNIEXPORT void JNICALL
Java_edu_wpi_first_networktables_NetworkTablesJNI_setTopicCached
(JNIEnv*, jclass, jint topic, jboolean value)
{
nt::SetTopicCached(topic, value);
}
/*
* Class: edu_wpi_first_networktables_NetworkTablesJNI
* Method: getTopicCached
* Signature: (I)Z
*/
JNIEXPORT jboolean JNICALL
Java_edu_wpi_first_networktables_NetworkTablesJNI_getTopicCached
(JNIEnv*, jclass, jint topic)
{
return nt::GetTopicCached(topic);
}
/*
* Class: edu_wpi_first_networktables_NetworkTablesJNI
* Method: getTopicTypeString

View File

@@ -917,6 +917,7 @@ void ServerImpl::ClientData3::EntryAssign(std::string_view name,
auto typeStr = TypeToString(value.type());
wpi::json properties = wpi::json::object();
properties["retained"] = true; // treat all NT3 published topics as retained
properties["cached"] = true; // treat all NT3 published topics as cached
if ((flags & NT_PERSISTENT) != 0) {
properties["persistent"] = true;
}
@@ -1095,6 +1096,7 @@ bool ServerImpl::TopicData::SetProperties(const wpi::json& update) {
void ServerImpl::TopicData::RefreshProperties() {
persistent = false;
retained = false;
cached = true;
auto persistentIt = properties.find("persistent");
if (persistentIt != properties.end()) {
@@ -1109,6 +1111,23 @@ void ServerImpl::TopicData::RefreshProperties() {
retained = *val;
}
}
auto cachedIt = properties.find("cached");
if (cachedIt != properties.end()) {
if (auto val = cachedIt->get_ptr<bool*>()) {
cached = *val;
}
}
if (!cached) {
lastValue = {};
lastValueClient = nullptr;
}
if (!cached && persistent) {
WARN("topic {}: disabling cached property disables persistent storage",
name);
}
}
bool ServerImpl::TopicData::SetFlags(unsigned int flags_) {
@@ -1122,6 +1141,30 @@ bool ServerImpl::TopicData::SetFlags(unsigned int flags_) {
persistent = false;
properties.erase("persistent");
}
if ((flags_ & NT_RETAINED) != 0) {
updated |= !retained;
retained = true;
properties["retained"] = true;
} else {
updated |= retained;
retained = false;
properties.erase("retained");
}
if ((flags_ & NT_UNCACHED) != 0) {
updated |= cached;
cached = false;
properties["cached"] = false;
lastValue = {};
lastValueClient = nullptr;
} else {
updated |= !cached;
cached = true;
properties.erase("cached");
}
if (!cached && persistent) {
WARN("topic {}: disabling cached property disables persistent storage",
name);
}
return updated;
}
@@ -1642,7 +1685,7 @@ ServerImpl::TopicData* ServerImpl::CreateTopic(ClientData* client,
} else {
// new topic
unsigned int id = m_topics.emplace_back(
std::make_unique<TopicData>(name, typeStr, properties));
std::make_unique<TopicData>(m_logger, name, typeStr, properties));
topic = m_topics[id].get();
topic->id = id;
topic->special = special;
@@ -1756,8 +1799,9 @@ void ServerImpl::SetFlags(ClientData* client, TopicData* topic,
void ServerImpl::SetValue(ClientData* client, TopicData* topic,
const Value& value) {
// update retained value if from same client or timestamp newer
if (!topic->lastValue || topic->lastValueClient == client ||
topic->lastValue.time() == 0 || value.time() >= topic->lastValue.time()) {
if (topic->cached && (!topic->lastValue || topic->lastValueClient == client ||
topic->lastValue.time() == 0 ||
value.time() >= topic->lastValue.time())) {
DEBUG4("updating '{}' last value (time was {} is {})", topic->name,
topic->lastValue.time(), value.time());
topic->lastValue = value;

View File

@@ -98,11 +98,15 @@ class ServerImpl final {
struct SubscriberData;
struct TopicData {
TopicData(std::string_view name, std::string_view typeStr)
: name{name}, typeStr{typeStr} {}
TopicData(std::string_view name, std::string_view typeStr,
wpi::json properties)
: name{name}, typeStr{typeStr}, properties(std::move(properties)) {
TopicData(wpi::Logger& logger, std::string_view name,
std::string_view typeStr)
: m_logger{logger}, name{name}, typeStr{typeStr} {}
TopicData(wpi::Logger& logger, std::string_view name,
std::string_view typeStr, wpi::json properties)
: m_logger{logger},
name{name},
typeStr{typeStr},
properties(std::move(properties)) {
RefreshProperties();
}
@@ -117,6 +121,7 @@ class ServerImpl final {
NT_Handle GetIdHandle() const { return Handle(0, id, Handle::kTopic); }
wpi::Logger& m_logger; // Must be m_logger for WARN macro to work
std::string name;
unsigned int id;
Value lastValue;
@@ -126,6 +131,7 @@ class ServerImpl final {
unsigned int publisherCount{0};
bool persistent{false};
bool retained{false};
bool cached{true};
bool special{false};
NT_Topic localHandle{0};

View File

@@ -295,6 +295,14 @@ NT_Bool NT_GetTopicRetained(NT_Topic topic) {
return nt::GetTopicRetained(topic);
}
void NT_SetTopicCached(NT_Topic topic, NT_Bool value) {
nt::SetTopicCached(topic, value);
}
NT_Bool NT_GetTopicCached(NT_Topic topic) {
return nt::GetTopicCached(topic);
}
NT_Bool NT_GetTopicExists(NT_Handle handle) {
return nt::GetTopicExists(handle);
}

View File

@@ -263,6 +263,22 @@ bool GetTopicRetained(NT_Topic topic) {
}
}
void SetTopicCached(NT_Topic topic, bool value) {
if (auto ii = InstanceImpl::GetTyped(topic, Handle::kTopic)) {
ii->localStorage.SetTopicCached(topic, value);
} else {
return;
}
}
bool GetTopicCached(NT_Topic topic) {
if (auto ii = InstanceImpl::GetTyped(topic, Handle::kTopic)) {
return ii->localStorage.GetTopicCached(topic);
} else {
return {};
}
}
bool GetTopicExists(NT_Handle handle) {
if (auto ii = InstanceImpl::GetHandle(handle)) {
return ii->localStorage.GetTopicExists(handle);