[ntcore] Don't update timestamp when value is unchanged (#5356)

This fixes an issue with commands run/cancel.
This commit is contained in:
Carl Hauser
2023-07-23 17:36:26 -07:00
committed by GitHub
parent 5ec7f18bdc
commit c3fab7f1f2
4 changed files with 121 additions and 30 deletions

View File

@@ -285,7 +285,8 @@ struct LSImpl {
void CheckReset(TopicData* topic);
bool SetValue(TopicData* topic, const Value& value, unsigned int eventFlags,
bool isDuplicate, const PublisherData* publisher);
bool suppressIfDuplicate, bool isDuplicate,
const PublisherData* publisher);
void NotifyValue(TopicData* topic, unsigned int eventFlags, bool isDuplicate,
const PublisherData* publisher);
@@ -495,6 +496,7 @@ void LSImpl::CheckReset(TopicData* topic) {
bool LSImpl::SetValue(TopicData* topic, const Value& value,
unsigned int eventFlags, bool isDuplicate,
bool suppressIfDuplicate,
const PublisherData* publisher) {
DEBUG4("SetValue({}, {}, {}, {})", topic->name, value.time(), eventFlags,
isDuplicate);
@@ -504,16 +506,19 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value,
if (!topic->lastValue || topic->lastValue.time() == 0 ||
value.time() >= topic->lastValue.time()) {
// TODO: notify option even if older value
topic->type = value.type();
topic->lastValue = value;
topic->lastValueFromNetwork = false;
NotifyValue(topic, eventFlags, isDuplicate, publisher);
}
if (!isDuplicate && topic->datalogType == value.type()) {
for (auto&& datalog : topic->datalogs) {
datalog.Append(value);
if (!(suppressIfDuplicate && isDuplicate)) {
topic->type = value.type();
topic->lastValue = value;
topic->lastValueFromNetwork = false;
NotifyValue(topic, eventFlags, isDuplicate, publisher);
if (topic->datalogType == value.type()) {
for (auto&& datalog : topic->datalogs) {
datalog.Append(value);
}
}
}
}
return true;
}
@@ -1259,20 +1264,21 @@ bool LSImpl::PublishLocalValue(PublisherData* publisher, const Value& value,
return false;
}
if (publisher->active) {
bool isDuplicate, isNetworkDuplicate;
bool isDuplicate, isNetworkDuplicate, suppressDuplicates;
if (force || publisher->config.keepDuplicates) {
isDuplicate = false;
suppressDuplicates = false;
isNetworkDuplicate = false;
} else {
isDuplicate = (publisher->topic->lastValue == value);
suppressDuplicates = true;
isNetworkDuplicate = (publisher->topic->lastValueNetwork == value);
}
isDuplicate = (publisher->topic->lastValue == value);
if (!isNetworkDuplicate && m_network) {
publisher->topic->lastValueNetwork = value;
m_network->SetValue(publisher->handle, value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL, isDuplicate,
publisher);
suppressDuplicates, publisher);
} else {
return false;
}
@@ -1392,7 +1398,7 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl->m_topics.Get(topicHandle)) {
if (m_impl->SetValue(topic, value, NT_EVENT_VALUE_REMOTE,
value == topic->lastValue, nullptr)) {
value == topic->lastValue, false, nullptr)) {
topic->lastValueNetwork = value;
topic->lastValueFromNetwork = true;
}

View File

@@ -491,7 +491,7 @@ TEST_F(LocalStorageTest, SetValueInvalidHandle) {
class LocalStorageDuplicatesTest : public LocalStorageTest {
public:
void SetupPubSub(bool keepPub, bool keepSub);
void SetValues();
void SetValues(bool expectDuplicates);
NT_Publisher pub;
NT_Subscriber sub;
@@ -518,11 +518,12 @@ void LocalStorageDuplicatesTest::SetupPubSub(bool keepPub, bool keepSub) {
{.pollStorage = 10, .keepDuplicates = keepSub});
}
void LocalStorageDuplicatesTest::SetValues() {
void LocalStorageDuplicatesTest::SetValues(bool expectDuplicates) {
storage.SetEntryValue(pub, val1);
storage.SetEntryValue(pub, val2);
// verify the timestamp was updated
EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time());
// verify the timestamp was updated (or not)
EXPECT_EQ(storage.GetEntryLastChange(sub),
expectDuplicates ? val2.time() : val1.time());
storage.SetEntryValue(pub, val3);
}
@@ -531,7 +532,7 @@ TEST_F(LocalStorageDuplicatesTest, Defaults) {
EXPECT_CALL(network, SetValue(pub, val1));
EXPECT_CALL(network, SetValue(pub, val3));
SetValues();
SetValues(false);
// verify 2nd update was dropped locally
auto values = storage.ReadQueueDouble(sub);
@@ -548,11 +549,11 @@ TEST_F(LocalStorageDuplicatesTest, KeepPub) {
EXPECT_CALL(network, SetValue(pub, val1)).Times(2);
// EXPECT_CALL(network, SetValue(pub, val2));
EXPECT_CALL(network, SetValue(pub, val3));
SetValues();
SetValues(true);
// verify all 3 updates were received locally
// verify only 2 updates were received locally
auto values = storage.ReadQueueDouble(sub);
ASSERT_EQ(values.size(), 3u);
ASSERT_EQ(values.size(), 2u);
}
TEST_F(LocalStorageDuplicatesTest, KeepSub) {
@@ -561,14 +562,28 @@ TEST_F(LocalStorageDuplicatesTest, KeepSub) {
// second update should NOT go to the network
EXPECT_CALL(network, SetValue(pub, val1));
EXPECT_CALL(network, SetValue(pub, val3));
SetValues();
SetValues(false);
// verify 2 updates were received locally
auto values = storage.ReadQueueDouble(sub);
ASSERT_EQ(values.size(), 2u);
}
TEST_F(LocalStorageDuplicatesTest, KeepPubSub) {
SetupPubSub(true, true);
// second update SHOULD go to the network
EXPECT_CALL(network, SetValue(pub, val1)).Times(2);
// EXPECT_CALL(network, SetValue(pub, val2));
EXPECT_CALL(network, SetValue(pub, val3));
SetValues(true);
// verify all 3 updates were received locally
auto values = storage.ReadQueueDouble(sub);
ASSERT_EQ(values.size(), 3u);
}
TEST_F(LocalStorageDuplicatesTest, FromNetwork) {
TEST_F(LocalStorageDuplicatesTest, FromNetworkDefault) {
SetupPubSub(false, false);
// incoming from the network are treated like a normal local publish
@@ -579,7 +594,7 @@ TEST_F(LocalStorageDuplicatesTest, FromNetwork) {
EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time());
storage.NetworkSetValue(topic, val3);
// verify 2nd update was dropped locally
// verify 2nd update was dropped for local subscriber
auto values = storage.ReadQueueDouble(sub);
ASSERT_EQ(values.size(), 2u);
ASSERT_EQ(values[0].value, val1.GetDouble());
@@ -588,6 +603,69 @@ TEST_F(LocalStorageDuplicatesTest, FromNetwork) {
ASSERT_EQ(values[1].time, val3.time());
}
TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepPub) {
SetupPubSub(true, false);
// incoming from the network are treated like a normal local publish
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, 0);
storage.NetworkSetValue(topic, val1);
storage.NetworkSetValue(topic, val2);
// verify the timestamp was updated
EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time());
storage.NetworkSetValue(topic, val3);
// verify 2nd update was dropped for local subscriber
auto values = storage.ReadQueueDouble(sub);
ASSERT_EQ(values.size(), 2u);
ASSERT_EQ(values[0].value, val1.GetDouble());
ASSERT_EQ(values[0].time, val1.time());
ASSERT_EQ(values[1].value, val3.GetDouble());
ASSERT_EQ(values[1].time, val3.time());
}
TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepSub) {
SetupPubSub(false, true);
// incoming from the network are treated like a normal local publish
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, 0);
storage.NetworkSetValue(topic, val1);
storage.NetworkSetValue(topic, val2);
// verify the timestamp was updated
EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time());
storage.NetworkSetValue(topic, val3);
// verify 2nd update was received by local subscriber
auto values = storage.ReadQueueDouble(sub);
ASSERT_EQ(values.size(), 3u);
ASSERT_EQ(values[0].value, val1.GetDouble());
ASSERT_EQ(values[0].time, val1.time());
ASSERT_EQ(values[1].value, val2.GetDouble());
ASSERT_EQ(values[1].time, val2.time());
ASSERT_EQ(values[2].value, val3.GetDouble());
ASSERT_EQ(values[2].time, val3.time());
}
TEST_F(LocalStorageDuplicatesTest, FromNetworkKeepPubSub) {
SetupPubSub(true, true);
// incoming from the network are treated like a normal local publish
auto topic = storage.NetworkAnnounce("foo", "double", {{}}, 0);
storage.NetworkSetValue(topic, val1);
storage.NetworkSetValue(topic, val2);
// verify the timestamp was updated
EXPECT_EQ(storage.GetEntryLastChange(sub), val2.time());
storage.NetworkSetValue(topic, val3);
// verify 2nd update was received by local subscriber
auto values = storage.ReadQueueDouble(sub);
ASSERT_EQ(values.size(), 3u);
ASSERT_EQ(values[0].value, val1.GetDouble());
ASSERT_EQ(values[0].time, val1.time());
ASSERT_EQ(values[1].value, val2.GetDouble());
ASSERT_EQ(values[1].time, val2.time());
ASSERT_EQ(values[2].value, val3.GetDouble());
ASSERT_EQ(values[2].time, val3.time());
}
class LocalStorageNumberVariantsTest : public LocalStorageTest {
public:
void CreateSubscriber(NT_Handle* handle, std::string_view name, NT_Type type,

View File

@@ -113,9 +113,11 @@ TEST(ShuffleboardInstanceTest, DuplicateSelectTabs) {
NTWrapper ntInst;
frc::detail::ShuffleboardInstance shuffleboardInst{ntInst.inst};
std::atomic_int counter = 0;
auto listener = nt::NetworkTableListener::CreateListener(
ntInst.inst.GetStringTopic("/Shuffleboard/.metadata/Selected"),
nt::EventFlags::kValueAll | nt::EventFlags::kImmediate,
auto subscriber =
ntInst.inst.GetStringTopic("/Shuffleboard/.metadata/Selected")
.Subscribe("", {.keepDuplicates = true});
ntInst.inst.AddListener(
subscriber, nt::EventFlags::kValueAll | nt::EventFlags::kImmediate,
[&counter](auto& event) { counter++; });
// There shouldn't be anything there

View File

@@ -13,6 +13,8 @@ import edu.wpi.first.networktables.GenericEntry;
import edu.wpi.first.networktables.NetworkTableEntry;
import edu.wpi.first.networktables.NetworkTableEvent.Kind;
import edu.wpi.first.networktables.NetworkTableInstance;
import edu.wpi.first.networktables.PubSubOption;
import edu.wpi.first.networktables.StringSubscriber;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
@@ -129,10 +131,13 @@ class ShuffleboardInstanceTest {
void testDuplicateSelectTabs() {
int listener = 0;
AtomicInteger counter = new AtomicInteger();
try {
try (StringSubscriber subscriber =
m_ntInstance
.getStringTopic("/Shuffleboard/.metadata/Selected")
.subscribe("", PubSubOption.keepDuplicates(true)); ) {
listener =
m_ntInstance.addListener(
m_ntInstance.getStringTopic("/Shuffleboard/.metadata/Selected"),
subscriber,
EnumSet.of(Kind.kValueAll, Kind.kImmediate),
event -> counter.incrementAndGet());