[ntcore] Add sub option for local vs remote changes (#4731)

This is the subscriber readQueue version of the local value listener flag.
This commit is contained in:
Peter Johnson
2022-11-29 21:57:20 -08:00
committed by GitHub
parent 18d28ec5e3
commit 9b319fd56b
7 changed files with 127 additions and 5 deletions

View File

@@ -11,6 +11,7 @@ public class PubSubOption {
private static final int kTopicsOnly = 3;
private static final int kPollStorage = 4;
private static final int kKeepDuplicates = 5;
private static final int kLocalRemote = 6;
PubSubOption(int type, double value) {
m_type = type;
@@ -64,8 +65,8 @@ public class PubSubOption {
/**
* Polling storage for subscription. Specifies the maximum number of updates NetworkTables should
* store between calls to the subscriber's poll() function. Defaults to 1 if sendAll is false, 20
* if sendAll is true.
* store between calls to the subscriber's readQueue() function. Defaults to 1 if sendAll is
* false, 20 if sendAll is true.
*
* @param depth number of entries to save for polling.
* @return option
@@ -74,6 +75,36 @@ public class PubSubOption {
return new PubSubOption(kPollStorage, depth);
}
/**
* If only local value updates should be queued for readQueue(). See also remoteOnly() and
* allUpdates(). Default is allUpdates. Only has an effect on subscriptions.
*
* @return option
*/
public static PubSubOption localOnly() {
return new PubSubOption(kLocalRemote, 1.0);
}
/**
* If only remote value updates should be queued for readQueue(). See also localOnly() and
* allUpdates(). Default is allUpdates. Only has an effect on subscriptions.
*
* @return option
*/
public static PubSubOption remoteOnly() {
return new PubSubOption(kLocalRemote, 2.0);
}
/**
* If both local and remote value updates should be queued for readQueue(). See also localOnly()
* and remoteOnly(). Default is allUpdates. Only has an effect on subscriptions.
*
* @return option
*/
public static PubSubOption allUpdates() {
return new PubSubOption(kLocalRemote, 0.0);
}
final int m_type;
final double m_value;
}

View File

@@ -492,9 +492,12 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value,
void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags,
bool isDuplicate) {
bool isNetwork = (eventFlags & NT_EVENT_VALUE_REMOTE) != 0;
for (auto&& subscriber : topic->localSubscribers) {
if (subscriber->active &&
(subscriber->config.keepDuplicates || !isDuplicate)) {
(subscriber->config.keepDuplicates || !isDuplicate) &&
((isNetwork && subscriber->config.fromRemote) ||
(!isNetwork && subscriber->config.fromLocal))) {
subscriber->pollStorage.emplace_back(topic->lastValue);
subscriber->handle.Set();
if (!subscriber->valueListeners.empty()) {

View File

@@ -29,6 +29,25 @@ nt::PubSubOptions::PubSubOptions(std::span<const PubSubOption> options) {
case NT_PUBSUB_POLLSTORAGE:
pollStorageSize = static_cast<size_t>(option.value);
break;
case NT_PUBSUB_LOCALREMOTE:
switch (static_cast<int>(option.value)) {
case 0:
case 3:
fromLocal = true;
fromRemote = true;
break;
case 1:
fromLocal = true;
fromRemote = false;
break;
case 2:
fromLocal = false;
fromRemote = true;
break;
default:
break;
}
break;
default:
break;
}

View File

@@ -22,6 +22,8 @@ class PubSubOptions {
bool topicsOnly = false;
bool prefixMatch = false;
bool keepDuplicates = false;
bool fromRemote = true;
bool fromLocal = true;
};
} // namespace nt

View File

@@ -95,6 +95,7 @@ enum NT_PubSubOptionType {
NT_PUBSUB_TOPICSONLY, /* only send topic changes, no value changes */
NT_PUBSUB_POLLSTORAGE, /* polling storage for subscription */
NT_PUBSUB_KEEPDUPLICATES, /* preserve duplicate values */
NT_PUBSUB_LOCALREMOTE, /* local, remote, or any value changes */
};
/** Event notification flags. */
@@ -287,7 +288,8 @@ struct NT_PubSubOption {
/**
* Option value. 1 (true) or 0 (false) for immediate and logging options,
* time between updates, in seconds, for periodic option.
* time between updates, in seconds, for periodic option. For local/remote
* option, 1=local only, 2=remote only, 0 or 3=both local and remote.
*/
double value;
};

View File

@@ -314,7 +314,7 @@ class PubSubOption {
/**
* Polling storage for subscription. Specifies the maximum number of updates
* NetworkTables should store between calls to the subscriber's poll()
* NetworkTables should store between calls to the subscriber's ReadQueue()
* function. Defaults to 1 if SendAll is false, 20 if SendAll is true.
*
* @param depth number of entries to save for polling.
@@ -324,6 +324,39 @@ class PubSubOption {
return PubSubOption{NT_PUBSUB_POLLSTORAGE, static_cast<double>(depth)};
}
/**
* If only local value updates should be queued for ReadQueue(). See also
* RemoteOnly() and AllUpdates(). Default is AllUpdates. Only has an effect on
* subscriptions.
*
* @return option
*/
static constexpr PubSubOption LocalOnly() {
return PubSubOption{NT_PUBSUB_LOCALREMOTE, 1.0};
}
/**
* If only remote value updates should be queued for ReadQueue(). See also
* LocalOnly() and AllUpdates(). Default is AllUpdates. Only has an effect on
* subscriptions.
*
* @return option
*/
static constexpr PubSubOption RemoteOnly() {
return PubSubOption{NT_PUBSUB_LOCALREMOTE, 2.0};
}
/**
* If both local and remote value updates should be queued for ReadQueue().
* See also LocalOnly() and RemoteOnly(). Default is AllUpdates. Only has an
* effect on subscriptions.
*
* @return option
*/
static constexpr PubSubOption AllUpdates() {
return PubSubOption{NT_PUBSUB_LOCALREMOTE, 0.0};
}
NT_PubSubOptionType type;
double value;
};

View File

@@ -807,4 +807,36 @@ TEST_F(LocalStorageTest, NetworkDuplicateDetect) {
storage.SetEntryValue(pub, Value::MakeDouble(1.0, 80));
}
TEST_F(LocalStorageTest, ReadQueueLocalRemote) {
EXPECT_CALL(network, Subscribe(_, _, _)).Times(3);
EXPECT_CALL(network, Publish(_, _, _, _, _, _)).Times(1);
auto subBoth = storage.Subscribe(fooTopic, NT_DOUBLE, "double",
{{PubSubOption::AllUpdates()}});
auto subLocal = storage.Subscribe(fooTopic, NT_DOUBLE, "double",
{{PubSubOption::LocalOnly()}});
auto subRemote = storage.Subscribe(fooTopic, NT_DOUBLE, "double",
{{PubSubOption::RemoteOnly()}});
auto pub = storage.Publish(fooTopic, NT_DOUBLE, "double", {}, {});
auto remoteTopic =
storage.NetworkAnnounce("foo", "double", wpi::json::object(), 0);
// local set
EXPECT_CALL(network, SetValue(_, _));
storage.SetEntryValue(pub, Value::MakeDouble(1.0, 50));
EXPECT_THAT(storage.ReadQueueDouble(subBoth),
ElementsAre(TSEq<TimestampedDouble>(1.0, 50)));
EXPECT_THAT(storage.ReadQueueDouble(subLocal),
ElementsAre(TSEq<TimestampedDouble>(1.0, 50)));
EXPECT_THAT(storage.ReadQueueDouble(subRemote), IsEmpty());
// network set
storage.NetworkSetValue(remoteTopic, Value::MakeDouble(2.0, 60));
EXPECT_THAT(storage.ReadQueueDouble(subBoth),
ElementsAre(TSEq<TimestampedDouble>(2.0, 60)));
EXPECT_THAT(storage.ReadQueueDouble(subRemote),
ElementsAre(TSEq<TimestampedDouble>(2.0, 60)));
EXPECT_THAT(storage.ReadQueueDouble(subLocal), IsEmpty());
}
} // namespace nt