diff --git a/glass/src/libnt/native/cpp/NetworkTables.cpp b/glass/src/libnt/native/cpp/NetworkTables.cpp index 7f94b4a836..246662f7b5 100644 --- a/glass/src/libnt/native/cpp/NetworkTables.cpp +++ b/glass/src/libnt/native/cpp/NetworkTables.cpp @@ -111,10 +111,10 @@ NetworkTablesModel::NetworkTablesModel() NetworkTablesModel::NetworkTablesModel(nt::NetworkTableInstance inst) : m_inst{inst}, - m_subscriber{nt::SubscribeMultiple(inst.GetHandle(), {{"", "$"}})}, + m_subscriber{inst, {{"", "$"}}}, m_topicPoller{inst}, m_valuePoller{inst} { - m_topicPoller.Add({{""}}, + m_topicPoller.Add(m_subscriber, NT_TOPIC_NOTIFY_IMMEDIATE | NT_TOPIC_NOTIFY_PROPERTIES | NT_TOPIC_NOTIFY_PUBLISH | NT_TOPIC_NOTIFY_UNPUBLISH); m_valuePoller.Add(m_subscriber, diff --git a/glass/src/libnt/native/include/glass/networktables/NetworkTables.h b/glass/src/libnt/native/include/glass/networktables/NetworkTables.h index 2caa244ed4..b34c034397 100644 --- a/glass/src/libnt/native/include/glass/networktables/NetworkTables.h +++ b/glass/src/libnt/native/include/glass/networktables/NetworkTables.h @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -179,7 +180,7 @@ class NetworkTablesModel : public Model { void UpdateClients(std::span data); nt::NetworkTableInstance m_inst; - NT_MultiSubscriber m_subscriber; + nt::MultiSubscriber m_subscriber; nt::TopicListenerPoller m_topicPoller; nt::ValueListenerPoller m_valuePoller; wpi::DenseMap> m_entries; diff --git a/ntcore/src/generate/java/NetworkTableInstance.java.jinja b/ntcore/src/generate/java/NetworkTableInstance.java.jinja index 0a56493f4c..60223480a5 100644 --- a/ntcore/src/generate/java/NetworkTableInstance.java.jinja +++ b/ntcore/src/generate/java/NetworkTableInstance.java.jinja @@ -5,11 +5,14 @@ package edu.wpi.first.networktables; import edu.wpi.first.util.WPIUtilJNI; +import edu.wpi.first.util.concurrent.Event; import edu.wpi.first.util.datalog.DataLog; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -30,6 +33,7 @@ import java.util.function.Consumer; * kept to the NetworkTableInstance returned by this function to keep it from being garbage * collected. */ +@SuppressWarnings("PMD.CouplingBetweenObjects") public final class NetworkTableInstance implements AutoCloseable { /** * Client/server mode flag values (as returned by {@link #getNetworkMode()}). This is a bitmask. @@ -62,6 +66,9 @@ public final class NetworkTableInstance implements AutoCloseable { @Override public synchronized void close() { if (m_owned && m_handle != 0) { + m_connectionListener.close(); + m_topicListener.close(); + m_valueListener.close(); NetworkTablesJNI.destroyInstance(m_handle); } } @@ -350,85 +357,181 @@ public final class NetworkTableInstance implements AutoCloseable { * Callback Creation Functions */ - private final ReentrantLock m_connectionListenerLock = new ReentrantLock(); - private final Map> m_connectionListeners = - new HashMap<>(); - private int m_connectionListenerPoller; + private abstract static class ListenerBase implements AutoCloseable { + protected final ReentrantLock m_lock = new ReentrantLock(); + protected final Map> m_listeners = new HashMap<>(); + private Thread m_thread; + protected int m_poller; + private boolean m_waitQueue; + private final Event m_waitQueueEvent = new Event(); + private final Condition m_waitQueueCond = m_lock.newCondition(); + protected final NetworkTableInstance m_inst; - @SuppressWarnings("PMD.AvoidCatchingThrowable") - private void startConnectionListenerThread() { - var connectionListenerThread = - new Thread( - () -> { - boolean wasInterrupted = false; - while (!Thread.interrupted()) { - try { - WPIUtilJNI.waitForObject(m_connectionListenerPoller); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - // don't try to destroy poller, as its handle is likely no longer valid - wasInterrupted = true; - break; - } - ConnectionNotification[] events = - NetworkTablesJNI.readConnectionListenerQueue(this, m_connectionListenerPoller); - for (ConnectionNotification event : events) { - Consumer listener; - m_connectionListenerLock.lock(); + ListenerBase(NetworkTableInstance inst) { + m_inst = inst; + } + + @Override + public void close() { + if (m_poller != 0) { + destroyPoller(); + } + m_poller = 0; + } + + protected abstract T[] readQueue(); + + protected abstract void destroyPoller(); + + protected void startThread(String name) { + m_thread = + new Thread( + () -> { + boolean wasInterrupted = false; + int[] handles = new int[] { m_poller, m_waitQueueEvent.getHandle() }; + while (!Thread.interrupted()) { try { - listener = m_connectionListeners.get(event.listener); - } finally { - m_connectionListenerLock.unlock(); - } - if (listener != null) { + WPIUtilJNI.waitForObjects(handles); + } catch (InterruptedException ex) { + m_lock.lock(); try { - listener.accept(event); - } catch (Throwable throwable) { - System.err.println( - "Unhandled exception during connection listener callback: " - + throwable.toString()); - throwable.printStackTrace(); + if (m_waitQueue) { + m_waitQueue = false; + m_waitQueueCond.signalAll(); + } + } finally { + m_lock.unlock(); + } + Thread.currentThread().interrupt(); + // don't try to destroy poller, as its handle is likely no longer valid + wasInterrupted = true; + break; + } + for (T event : readQueue()) { + Consumer listener; + m_lock.lock(); + try { + listener = m_listeners.get(event.listener); + } finally { + m_lock.unlock(); + } + if (listener != null) { + try { + listener.accept(event); + } catch (Throwable throwable) { + System.err.println( + "Unhandled exception during listener callback: " + + throwable.toString()); + throwable.printStackTrace(); + } } } + m_lock.lock(); + try { + if (m_waitQueue) { + m_waitQueue = false; + m_waitQueueCond.signalAll(); + } + } finally { + m_lock.unlock(); + } } - } - m_connectionListenerLock.lock(); - try { - if (!wasInterrupted) { - NetworkTablesJNI.destroyConnectionListenerPoller(m_connectionListenerPoller); + m_lock.lock(); + try { + if (!wasInterrupted) { + destroyPoller(); + } + m_poller = 0; + } finally { + m_lock.unlock(); } - m_connectionListenerPoller = 0; - } finally { - m_connectionListenerLock.unlock(); + }, + name); + m_thread.setDaemon(true); + m_thread.start(); + } + + boolean waitForQueue(double timeout) { + m_lock.lock(); + try { + if (m_poller != 0) { + m_waitQueue = true; + m_waitQueueEvent.set(); + while (m_waitQueue) { + try { + if (timeout < 0) { + m_waitQueueCond.await(); + } else { + return m_waitQueueCond.await((long) (timeout * 1e9), TimeUnit.NANOSECONDS); } - }, - "NTConnectionListener"); - connectionListenerThread.setDaemon(true); - connectionListenerThread.start(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return true; + } + } + } + } finally { + m_lock.unlock(); + } + return true; + } } + private static final class ConnectionListener extends ListenerBase { + ConnectionListener(NetworkTableInstance inst) { + super(inst); + } + + int add(boolean immediateNotify, Consumer listener) { + m_lock.lock(); + try { + if (m_poller == 0) { + m_poller = NetworkTablesJNI.createConnectionListenerPoller(m_inst.getHandle()); + startThread("NTConnectionListener"); + } + int h = NetworkTablesJNI.addPolledConnectionListener(m_poller, immediateNotify); + m_listeners.put(h, listener); + return h; + } finally { + m_lock.unlock(); + } + } + + void remove(int listener) { + m_lock.lock(); + try { + m_listeners.remove(listener); + } finally { + m_lock.unlock(); + } + NetworkTablesJNI.removeConnectionListener(listener); + } + + @Override + protected ConnectionNotification[] readQueue() { + return NetworkTablesJNI.readConnectionListenerQueue(m_inst, m_poller); + } + + @Override + protected void destroyPoller() { + NetworkTablesJNI.destroyConnectionListenerPoller(m_poller); + } + } + + private ConnectionListener m_connectionListener = new ConnectionListener(this); + /** - * Add a connection listener. + * Add a connection listener. The callback function is called asynchronously on a separate + * thread, so it's important to use synchronization or atomics when accessing any shared state + * from the callback function. * - * @param listener Listener to add * @param immediateNotify Notify listener of all existing connections + * @param listener Listener to add * @return Listener handle */ public int addConnectionListener( - Consumer listener, boolean immediateNotify) { - m_connectionListenerLock.lock(); - try { - if (m_connectionListenerPoller == 0) { - m_connectionListenerPoller = NetworkTablesJNI.createConnectionListenerPoller(m_handle); - startConnectionListenerThread(); - } - int handle = - NetworkTablesJNI.addPolledConnectionListener(m_connectionListenerPoller, immediateNotify); - m_connectionListeners.put(handle, listener); - return handle; - } finally { - m_connectionListenerLock.unlock(); - } + boolean immediateNotify, Consumer listener) { + return m_connectionListener.add(immediateNotify, listener); } /** @@ -437,13 +540,313 @@ public final class NetworkTableInstance implements AutoCloseable { * @param listener Listener handle to remove */ public void removeConnectionListener(int listener) { - m_connectionListenerLock.lock(); - try { - m_connectionListeners.remove(listener); - } finally { - m_connectionListenerLock.unlock(); + m_connectionListener.remove(listener); + } + + /** + * Wait for the connection listener queue to be empty. This is primarily useful for deterministic + * testing. This blocks until either the connection listener queue is empty (e.g. there are no + * more events that need to be passed along to callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to + * block indefinitely + * @return False if timed out, otherwise true. + */ + public boolean waitForConnectionListenerQueue(double timeout) { + return m_connectionListener.waitForQueue(timeout); + } + + private static final class TopicListener extends ListenerBase { + TopicListener(NetworkTableInstance inst) { + super(inst); } - NetworkTablesJNI.removeConnectionListener(listener); + + int add(int handle, int eventMask, Consumer listener) { + m_lock.lock(); + try { + if (m_poller == 0) { + m_poller = NetworkTablesJNI.createTopicListenerPoller(m_inst.getHandle()); + startThread("NTTopicListener"); + } + int h = NetworkTablesJNI.addPolledTopicListener(m_poller, handle, eventMask); + m_listeners.put(h, listener); + return h; + } finally { + m_lock.unlock(); + } + } + + int add(String[] prefixes, int eventMask, Consumer listener) { + m_lock.lock(); + try { + if (m_poller == 0) { + m_poller = NetworkTablesJNI.createTopicListenerPoller(m_inst.getHandle()); + startThread("NTTopicListener"); + } + int h = NetworkTablesJNI.addPolledTopicListener(m_poller, prefixes, eventMask); + m_listeners.put(h, listener); + return h; + } finally { + m_lock.unlock(); + } + } + + void remove(int listener) { + m_lock.lock(); + try { + m_listeners.remove(listener); + } finally { + m_lock.unlock(); + } + NetworkTablesJNI.removeTopicListener(listener); + } + + @Override + protected TopicNotification[] readQueue() { + return NetworkTablesJNI.readTopicListenerQueue(m_inst, m_poller); + } + + @Override + protected void destroyPoller() { + NetworkTablesJNI.destroyTopicListenerPoller(m_poller); + } + } + + private TopicListener m_topicListener = new TopicListener(this); + + /** + * Add a topic listener for changes on a particular topic. The callback function is called + * asynchronously on a separate thread, so it's important to use synchronization or atomics when + * accessing any shared state from the callback function. + * + * @param topic Topic + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + public int addTopicListener( + Topic topic, int eventMask, Consumer listener) { + if (topic.getInstance().getHandle() != m_handle) { + throw new IllegalArgumentException("topic is not from this instance"); + } + return m_topicListener.add(topic.getHandle(), eventMask, listener); + } + + /** + * Add a topic listener for topic changes on a subscriber. The callback function is called + * asynchronously on a separate thread, so it's important to use synchronization or atomics when + * accessing any shared state from the callback function. This does NOT keep the subscriber + * active. + * + * @param subscriber Subscriber + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + public int addTopicListener( + Subscriber subscriber, int eventMask, Consumer listener) { + if (subscriber.getTopic().getInstance().getHandle() != m_handle) { + throw new IllegalArgumentException("subscriber is not from this instance"); + } + return m_topicListener.add(subscriber.getHandle(), eventMask, listener); + } + + /** + * Add a topic listener for topic changes on a subscriber. The callback function is called + * asynchronously on a separate thread, so it's important to use synchronization or atomics when + * accessing any shared state from the callback function. This does NOT keep the subscriber + * active. + * + * @param subscriber Subscriber + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + public int addTopicListener( + MultiSubscriber subscriber, int eventMask, Consumer listener) { + if (subscriber.getInstance().getHandle() != m_handle) { + throw new IllegalArgumentException("subscriber is not from this instance"); + } + return m_topicListener.add(subscriber.getHandle(), eventMask, listener); + } + + /** + * Add a topic listener for topic changes on an entry. The callback function is called + * asynchronously on a separate thread, so it's important to use synchronization or atomics when + * accessing any shared state from the callback function. + * + * @param entry Entry + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + public int addTopicListener( + NetworkTableEntry entry, int eventMask, Consumer listener) { + if (entry.getTopic().getInstance().getHandle() != m_handle) { + throw new IllegalArgumentException("entry is not from this instance"); + } + return m_topicListener.add(entry.getHandle(), eventMask, listener); + } + + /** + * Add a topic listener for changes to topics with names that start with any of the given + * prefixes. The callback function is called asynchronously on a separate thread, so it's + * important to use synchronization or atomics when accessing any shared state from the callback + * function. + * + * @param prefixes Topic name string prefixes + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + public int addTopicListener( + String[] prefixes, + int eventMask, + Consumer listener) { + return m_topicListener.add(prefixes, eventMask, listener); + } + + /** + * Remove a topic listener. + * + * @param listener Listener handle to remove + */ + public void removeTopicListener(int listener) { + m_topicListener.remove(listener); + } + + /** + * Wait for the topic listener queue to be empty. This is primarily useful for deterministic + * testing. This blocks until either the topic listener queue is empty (e.g. there are no + * more events that need to be passed along to callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to + * block indefinitely + * @return False if timed out, otherwise true. + */ + public boolean waitForTopicListenerQueue(double timeout) { + return m_topicListener.waitForQueue(timeout); + } + + private static final class ValueListener extends ListenerBase { + ValueListener(NetworkTableInstance inst) { + super(inst); + } + + int add(int handle, int eventMask, Consumer listener) { + m_lock.lock(); + try { + if (m_poller == 0) { + m_poller = NetworkTablesJNI.createValueListenerPoller(m_inst.getHandle()); + startThread("NTValueListener"); + } + int h = NetworkTablesJNI.addPolledValueListener(m_poller, handle, eventMask); + m_listeners.put(h, listener); + return h; + } finally { + m_lock.unlock(); + } + } + + void remove(int listener) { + m_lock.lock(); + try { + m_listeners.remove(listener); + } finally { + m_lock.unlock(); + } + NetworkTablesJNI.removeValueListener(listener); + } + + @Override + protected ValueNotification[] readQueue() { + return NetworkTablesJNI.readValueListenerQueue(m_inst, m_poller); + } + + @Override + protected void destroyPoller() { + NetworkTablesJNI.destroyValueListenerPoller(m_poller); + } + } + + private ValueListener m_valueListener = new ValueListener(this); + + /** + * Add a value listener for value changes on a subscriber. The callback function is called + * asynchronously on a separate thread, so it's important to use synchronization or atomics when + * accessing any shared state from the callback function. This does NOT keep the subscriber + * active. + * + * @param subscriber Subscriber + * @param eventMask Bitmask of ValueListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + public int addValueListener( + Subscriber subscriber, int eventMask, Consumer listener) { + if (subscriber.getTopic().getInstance().getHandle() != m_handle) { + throw new IllegalArgumentException("subscriber is not from this instance"); + } + return m_valueListener.add(subscriber.getHandle(), eventMask, listener); + } + + /** + * Add a value listener for value changes on a subscriber. The callback function is called + * asynchronously on a separate thread, so it's important to use synchronization or atomics when + * accessing any shared state from the callback function. This does NOT keep the subscriber + * active. + * + * @param subscriber Subscriber + * @param eventMask Bitmask of ValueListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + public int addValueListener( + MultiSubscriber subscriber, int eventMask, Consumer listener) { + if (subscriber.getInstance().getHandle() != m_handle) { + throw new IllegalArgumentException("subscriber is not from this instance"); + } + return m_valueListener.add(subscriber.getHandle(), eventMask, listener); + } + + /** + * Add a value listener for value changes on an entry. The callback function is called + * asynchronously on a separate thread, so it's important to use synchronization or atomics when + * accessing any shared state from the callback function. + * + * @param entry Entry + * @param eventMask Bitmask of ValueListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + public int addValueListener( + NetworkTableEntry entry, int eventMask, Consumer listener) { + if (entry.getTopic().getInstance().getHandle() != m_handle) { + throw new IllegalArgumentException("entry is not from this instance"); + } + return m_valueListener.add(entry.getHandle(), eventMask, listener); + } + + /** + * Remove a value listener. + * + * @param listener Listener handle to remove + */ + public void removeValueListener(int listener) { + m_valueListener.remove(listener); + } + + /** + * Wait for the value listener queue to be empty. This is primarily useful for deterministic + * testing. This blocks until either the value listener queue is empty (e.g. there are no + * more events that need to be passed along to callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to + * block indefinitely + * @return False if timed out, otherwise true. + */ + public boolean waitForValueListenerQueue(double timeout) { + return m_valueListener.waitForQueue(timeout); } /* diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/ConnectionListener.java b/ntcore/src/main/java/edu/wpi/first/networktables/ConnectionListener.java index ee42087a01..c8996ffd6d 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/ConnectionListener.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/ConnectionListener.java @@ -4,11 +4,6 @@ package edu.wpi.first.networktables; -import edu.wpi.first.util.WPIUtilJNI; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; /** @@ -28,30 +23,14 @@ public final class ConnectionListener implements AutoCloseable { NetworkTableInstance inst, boolean immediateNotify, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = inst; - s_poller = NetworkTablesJNI.createConnectionListenerPoller(inst.getHandle()); - startThread(); - } - m_handle = NetworkTablesJNI.addPolledConnectionListener(s_poller, immediateNotify); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = inst; + m_handle = inst.addConnectionListener(immediateNotify, listener); } @Override public synchronized void close() { if (m_handle != 0) { - s_lock.lock(); - try { - s_listeners.remove(m_handle); - } finally { - s_lock.unlock(); - } - NetworkTablesJNI.removeConnectionListener(m_handle); + m_inst.removeConnectionListener(m_handle); m_handle = 0; } } @@ -74,72 +53,19 @@ public final class ConnectionListener implements AutoCloseable { return m_handle; } - private int m_handle; - - private static final ReentrantLock s_lock = new ReentrantLock(); - private static final Map> s_listeners = new HashMap<>(); - private static Thread s_thread; - private static NetworkTableInstance s_inst; - private static int s_poller; - private static boolean s_waitQueue; - private static final Condition s_waitQueueCond = s_lock.newCondition(); - - private static void startThread() { - s_thread = - new Thread( - () -> { - boolean wasInterrupted = false; - while (!Thread.interrupted()) { - try { - WPIUtilJNI.waitForObject(s_poller); - } catch (InterruptedException ex) { - s_lock.lock(); - try { - if (s_waitQueue) { - s_waitQueue = false; - s_waitQueueCond.signalAll(); - continue; - } - } finally { - s_lock.unlock(); - } - Thread.currentThread().interrupt(); - // don't try to destroy poller, as its handle is likely no longer valid - wasInterrupted = true; - break; - } - for (ConnectionNotification event : - NetworkTablesJNI.readConnectionListenerQueue(s_inst, s_poller)) { - Consumer listener; - s_lock.lock(); - try { - listener = s_listeners.get(event.listener); - } finally { - s_lock.unlock(); - } - if (listener != null) { - try { - listener.accept(event); - } catch (Throwable throwable) { - System.err.println( - "Unhandled exception during listener callback: " + throwable.toString()); - throwable.printStackTrace(); - } - } - } - } - s_lock.lock(); - try { - if (!wasInterrupted) { - NetworkTablesJNI.destroyConnectionListenerPoller(s_poller); - } - s_poller = 0; - } finally { - s_lock.unlock(); - } - }, - "ConnectionListener"); - s_thread.setDaemon(true); - s_thread.start(); + /** + * Wait for the connection listener queue to be empty. This is primarily useful for deterministic + * testing. This blocks until either the connection listener queue is empty (e.g. there are no + * more events that need to be passed along to callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to + * block indefinitely + * @return False if timed out, otherwise true. + */ + public boolean waitForQueue(double timeout) { + return m_inst.waitForConnectionListenerQueue(timeout); } + + private final NetworkTableInstance m_inst; + private int m_handle; } diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/ConnectionNotification.java b/ntcore/src/main/java/edu/wpi/first/networktables/ConnectionNotification.java index 6fe2386dfe..b20c17a84a 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/ConnectionNotification.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/ConnectionNotification.java @@ -6,13 +6,7 @@ package edu.wpi.first.networktables; /** NetworkTables Connection notification. */ @SuppressWarnings("MemberName") -public final class ConnectionNotification { - /** - * Handle of listener that was triggered. ConnectionListener.getHandle() or the return value of - * ConnectionListenerPoller.add() can be used to map this to a specific added listener. - */ - public final int listener; - +public final class ConnectionNotification extends NotificationBase { /** True if event is due to connection being established. */ public final boolean connected; @@ -29,15 +23,8 @@ public final class ConnectionNotification { */ public ConnectionNotification( NetworkTableInstance inst, int listener, boolean connected, ConnectionInfo conn) { - this.m_inst = inst; - this.listener = listener; + super(inst, listener); this.connected = connected; this.conn = conn; } - - private final NetworkTableInstance m_inst; - - public NetworkTableInstance getInstance() { - return m_inst; - } } diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/NotificationBase.java b/ntcore/src/main/java/edu/wpi/first/networktables/NotificationBase.java new file mode 100644 index 0000000000..5496692c64 --- /dev/null +++ b/ntcore/src/main/java/edu/wpi/first/networktables/NotificationBase.java @@ -0,0 +1,38 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +package edu.wpi.first.networktables; + +/** Base class for NetworkTables notifications. */ +@SuppressWarnings("MemberName") +public class NotificationBase { + /** + * Handle of listener that was triggered. The value returned when adding the listener can be used + * to map this to a specific added listener. + */ + public final int listener; + + /** + * Constructor. This should generally only be used internally to NetworkTables. + * + * @param inst Instance + * @param listener Listener handle + */ + public NotificationBase(NetworkTableInstance inst, int listener) { + this.m_inst = inst; + this.listener = listener; + } + + /* Network table instance. */ + protected final NetworkTableInstance m_inst; + + /** + * Gets the instance associated with this notification. + * + * @return Instance + */ + public NetworkTableInstance getInstance() { + return m_inst; + } +} diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/TopicInfo.java b/ntcore/src/main/java/edu/wpi/first/networktables/TopicInfo.java index da44f7172e..abd8c2af79 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/TopicInfo.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/TopicInfo.java @@ -42,6 +42,15 @@ public final class TopicInfo { /* Cached topic object. */ private Topic m_topicObject; + /** + * Get the instance. + * + * @return Instance + */ + public NetworkTableInstance getInstance() { + return m_inst; + } + /** * Get the topic as an object. * diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/TopicListener.java b/ntcore/src/main/java/edu/wpi/first/networktables/TopicListener.java index b833fe1881..f6e0b4c6d0 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/TopicListener.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/TopicListener.java @@ -4,11 +4,6 @@ package edu.wpi.first.networktables; -import edu.wpi.first.util.WPIUtilJNI; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; /** @@ -26,45 +21,24 @@ public final class TopicListener implements AutoCloseable { * @param listener Listener function */ public TopicListener(Topic topic, int eventMask, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = topic.getInstance(); - s_poller = NetworkTablesJNI.createTopicListenerPoller(s_inst.getHandle()); - startThread(); - } - m_handle = NetworkTablesJNI.addPolledTopicListener(s_poller, topic.getHandle(), eventMask); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = topic.getInstance(); + m_handle = m_inst.addTopicListener(topic, eventMask, listener); } /** - * Create a listener for topic changes on a subscriber. + * Create a listener for topic changes on a subscriber. This does NOT keep the subscriber active. * * @param subscriber Subscriber * @param eventMask Bitmask of TopicListenerFlags values * @param listener Listener function */ public TopicListener(Subscriber subscriber, int eventMask, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = subscriber.getTopic().getInstance(); - s_poller = NetworkTablesJNI.createTopicListenerPoller(s_inst.getHandle()); - startThread(); - } - m_handle = - NetworkTablesJNI.addPolledTopicListener(s_poller, subscriber.getHandle(), eventMask); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = subscriber.getTopic().getInstance(); + m_handle = m_inst.addTopicListener(subscriber, eventMask, listener); } /** - * Create a listener for topic changes on a subscriber. + * Create a listener for topic changes on a subscriber. This does NOT keep the subscriber active. * * @param subscriber Subscriber * @param eventMask Bitmask of TopicListenerFlags values @@ -72,19 +46,8 @@ public final class TopicListener implements AutoCloseable { */ public TopicListener( MultiSubscriber subscriber, int eventMask, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = subscriber.getInstance(); - s_poller = NetworkTablesJNI.createTopicListenerPoller(s_inst.getHandle()); - startThread(); - } - m_handle = - NetworkTablesJNI.addPolledTopicListener(s_poller, subscriber.getHandle(), eventMask); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = subscriber.getInstance(); + m_handle = m_inst.addTopicListener(subscriber, eventMask, listener); } /** @@ -96,18 +59,8 @@ public final class TopicListener implements AutoCloseable { */ public TopicListener( NetworkTableEntry entry, int eventMask, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = entry.getInstance(); - s_poller = NetworkTablesJNI.createTopicListenerPoller(s_inst.getHandle()); - startThread(); - } - m_handle = NetworkTablesJNI.addPolledTopicListener(s_poller, entry.getHandle(), eventMask); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = entry.getInstance(); + m_handle = m_inst.addTopicListener(entry, eventMask, listener); } /** @@ -123,30 +76,14 @@ public final class TopicListener implements AutoCloseable { String[] prefixes, int eventMask, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = inst; - s_poller = NetworkTablesJNI.createTopicListenerPoller(inst.getHandle()); - startThread(); - } - m_handle = NetworkTablesJNI.addPolledTopicListener(s_poller, prefixes, eventMask); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = inst; + m_handle = inst.addTopicListener(prefixes, eventMask, listener); } @Override public synchronized void close() { if (m_handle != 0) { - s_lock.lock(); - try { - s_listeners.remove(m_handle); - } finally { - s_lock.unlock(); - } - NetworkTablesJNI.removeTopicListener(m_handle); + m_inst.removeTopicListener(m_handle); m_handle = 0; } } @@ -169,72 +106,19 @@ public final class TopicListener implements AutoCloseable { return m_handle; } - private int m_handle; - - private static final ReentrantLock s_lock = new ReentrantLock(); - private static final Map> s_listeners = new HashMap<>(); - private static Thread s_thread; - private static NetworkTableInstance s_inst; - private static int s_poller; - private static boolean s_waitQueue; - private static final Condition s_waitQueueCond = s_lock.newCondition(); - - private static void startThread() { - s_thread = - new Thread( - () -> { - boolean wasInterrupted = false; - while (!Thread.interrupted()) { - try { - WPIUtilJNI.waitForObject(s_poller); - } catch (InterruptedException ex) { - s_lock.lock(); - try { - if (s_waitQueue) { - s_waitQueue = false; - s_waitQueueCond.signalAll(); - continue; - } - } finally { - s_lock.unlock(); - } - Thread.currentThread().interrupt(); - // don't try to destroy poller, as its handle is likely no longer valid - wasInterrupted = true; - break; - } - for (TopicNotification event : - NetworkTablesJNI.readTopicListenerQueue(s_inst, s_poller)) { - Consumer listener; - s_lock.lock(); - try { - listener = s_listeners.get(event.listener); - } finally { - s_lock.unlock(); - } - if (listener != null) { - try { - listener.accept(event); - } catch (Throwable throwable) { - System.err.println( - "Unhandled exception during listener callback: " + throwable.toString()); - throwable.printStackTrace(); - } - } - } - } - s_lock.lock(); - try { - if (!wasInterrupted) { - NetworkTablesJNI.destroyTopicListenerPoller(s_poller); - } - s_poller = 0; - } finally { - s_lock.unlock(); - } - }, - "TopicListener"); - s_thread.setDaemon(true); - s_thread.start(); + /** + * Wait for the topic listener queue to be empty. This is primarily useful for deterministic + * testing. This blocks until either the topic listener queue is empty (e.g. there are no more + * events that need to be passed along to callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to + * block indefinitely + * @return False if timed out, otherwise true. + */ + public boolean waitForQueue(double timeout) { + return m_inst.waitForTopicListenerQueue(timeout); } + + private final NetworkTableInstance m_inst; + private int m_handle; } diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/TopicListenerPoller.java b/ntcore/src/main/java/edu/wpi/first/networktables/TopicListenerPoller.java index 9a0e2f9c7d..02fd87e3e1 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/TopicListenerPoller.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/TopicListenerPoller.java @@ -31,7 +31,7 @@ public final class TopicListenerPoller implements AutoCloseable { } /** - * Start listening to topic changes on a subscriber. + * Start listening to topic changes on a subscriber. This does NOT keep the subscriber active. * * @param subscriber Subscriber * @param eventMask Bitmask of TopicListenerFlags values @@ -42,7 +42,7 @@ public final class TopicListenerPoller implements AutoCloseable { } /** - * Start listening to topic changes on a subscriber. + * Start listening to topic changes on a subscriber. This does NOT keep the subscriber active. * * @param subscriber Subscriber * @param eventMask Bitmask of TopicListenerFlags values diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/TopicNotification.java b/ntcore/src/main/java/edu/wpi/first/networktables/TopicNotification.java index 7f0de52ac3..e8d717446c 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/TopicNotification.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/TopicNotification.java @@ -6,13 +6,7 @@ package edu.wpi.first.networktables; /** NetworkTables topic notification. */ @SuppressWarnings("MemberName") -public final class TopicNotification { - /** - * Handle of listener that was triggered. TopicListener.getHandle() or the return value of - * TopicListenerPoller.add() can be used to map this to a specific added listener. - */ - public final int listener; - +public final class TopicNotification extends NotificationBase { /** Topic information. */ public final TopicInfo info; @@ -30,7 +24,7 @@ public final class TopicNotification { * @param flags Update flags */ public TopicNotification(int listener, TopicInfo info, int flags) { - this.listener = listener; + super(info.getInstance(), listener); this.info = info; this.flags = flags; } diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/ValueListener.java b/ntcore/src/main/java/edu/wpi/first/networktables/ValueListener.java index 36e747e577..c357cc58b1 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/ValueListener.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/ValueListener.java @@ -4,11 +4,6 @@ package edu.wpi.first.networktables; -import edu.wpi.first.util.WPIUtilJNI; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; /** @@ -19,30 +14,19 @@ import java.util.function.Consumer; */ public final class ValueListener implements AutoCloseable { /** - * Create a listener for value changes on a subscriber. + * Create a listener for value changes on a subscriber. This does NOT keep the subscriber active. * * @param subscriber Subscriber * @param eventMask Bitmask of ValueListenerFlags values * @param listener Listener function */ public ValueListener(Subscriber subscriber, int eventMask, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = subscriber.getTopic().getInstance(); - s_poller = NetworkTablesJNI.createValueListenerPoller(s_inst.getHandle()); - startThread(); - } - m_handle = - NetworkTablesJNI.addPolledValueListener(s_poller, subscriber.getHandle(), eventMask); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = subscriber.getTopic().getInstance(); + m_handle = m_inst.addValueListener(subscriber, eventMask, listener); } /** - * Create a listener for value changes on a subscriber. + * Create a listener for value changes on a subscriber. This does NOT keep the subscriber active. * * @param subscriber Subscriber * @param eventMask Bitmask of ValueListenerFlags values @@ -50,19 +34,8 @@ public final class ValueListener implements AutoCloseable { */ public ValueListener( MultiSubscriber subscriber, int eventMask, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = subscriber.getInstance(); - s_poller = NetworkTablesJNI.createValueListenerPoller(s_inst.getHandle()); - startThread(); - } - m_handle = - NetworkTablesJNI.addPolledValueListener(s_poller, subscriber.getHandle(), eventMask); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = subscriber.getInstance(); + m_handle = m_inst.addValueListener(subscriber, eventMask, listener); } /** @@ -74,30 +47,14 @@ public final class ValueListener implements AutoCloseable { */ public ValueListener( NetworkTableEntry entry, int eventMask, Consumer listener) { - s_lock.lock(); - try { - if (s_poller == 0) { - s_inst = entry.getInstance(); - s_poller = NetworkTablesJNI.createValueListenerPoller(s_inst.getHandle()); - startThread(); - } - m_handle = NetworkTablesJNI.addPolledValueListener(s_poller, entry.getHandle(), eventMask); - s_listeners.put(m_handle, listener); - } finally { - s_lock.unlock(); - } + m_inst = entry.getInstance(); + m_handle = m_inst.addValueListener(entry, eventMask, listener); } @Override public synchronized void close() { if (m_handle != 0) { - s_lock.lock(); - try { - s_listeners.remove(m_handle); - } finally { - s_lock.unlock(); - } - NetworkTablesJNI.removeValueListener(m_handle); + m_inst.removeValueListener(m_handle); m_handle = 0; } } @@ -120,72 +77,19 @@ public final class ValueListener implements AutoCloseable { return m_handle; } - private int m_handle; - - private static final ReentrantLock s_lock = new ReentrantLock(); - private static final Map> s_listeners = new HashMap<>(); - private static Thread s_thread; - private static NetworkTableInstance s_inst; - private static int s_poller; - private static boolean s_waitQueue; - private static final Condition s_waitQueueCond = s_lock.newCondition(); - - private static void startThread() { - s_thread = - new Thread( - () -> { - boolean wasInterrupted = false; - while (!Thread.interrupted()) { - try { - WPIUtilJNI.waitForObject(s_poller); - } catch (InterruptedException ex) { - s_lock.lock(); - try { - if (s_waitQueue) { - s_waitQueue = false; - s_waitQueueCond.signalAll(); - continue; - } - } finally { - s_lock.unlock(); - } - Thread.currentThread().interrupt(); - // don't try to destroy poller, as its handle is likely no longer valid - wasInterrupted = true; - break; - } - for (ValueNotification event : - NetworkTablesJNI.readValueListenerQueue(s_inst, s_poller)) { - Consumer listener; - s_lock.lock(); - try { - listener = s_listeners.get(event.listener); - } finally { - s_lock.unlock(); - } - if (listener != null) { - try { - listener.accept(event); - } catch (Throwable throwable) { - System.err.println( - "Unhandled exception during listener callback: " + throwable.toString()); - throwable.printStackTrace(); - } - } - } - } - s_lock.lock(); - try { - if (!wasInterrupted) { - NetworkTablesJNI.destroyValueListenerPoller(s_poller); - } - s_poller = 0; - } finally { - s_lock.unlock(); - } - }, - "ValueListener"); - s_thread.setDaemon(true); - s_thread.start(); + /** + * Wait for the value listener queue to be empty. This is primarily useful for deterministic + * testing. This blocks until either the value listener queue is empty (e.g. there are no more + * events that need to be passed along to callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to + * block indefinitely + * @return False if timed out, otherwise true. + */ + public boolean waitForQueue(double timeout) { + return m_inst.waitForValueListenerQueue(timeout); } + + private final NetworkTableInstance m_inst; + private int m_handle; } diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/ValueListenerPoller.java b/ntcore/src/main/java/edu/wpi/first/networktables/ValueListenerPoller.java index dc7142c83e..167288a878 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/ValueListenerPoller.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/ValueListenerPoller.java @@ -20,7 +20,7 @@ public final class ValueListenerPoller implements AutoCloseable { } /** - * Start listening to value changes on a subscriber. + * Start listening to value changes on a subscriber. This does NOT keep the subscriber active. * * @param subscriber Subscriber * @param eventMask Bitmask of ValueListenerFlags values @@ -31,7 +31,7 @@ public final class ValueListenerPoller implements AutoCloseable { } /** - * Start listening to value changes on a subscriber. + * Start listening to value changes on a subscriber. This does NOT keep the subscriber active. * * @param subscriber Subscriber * @param eventMask Bitmask of ValueListenerFlags values diff --git a/ntcore/src/main/java/edu/wpi/first/networktables/ValueNotification.java b/ntcore/src/main/java/edu/wpi/first/networktables/ValueNotification.java index 1a57204abd..067efd3fce 100644 --- a/ntcore/src/main/java/edu/wpi/first/networktables/ValueNotification.java +++ b/ntcore/src/main/java/edu/wpi/first/networktables/ValueNotification.java @@ -6,13 +6,7 @@ package edu.wpi.first.networktables; /** NetworkTables value notification. */ @SuppressWarnings("MemberName") -public final class ValueNotification { - /** - * Handle of listener that was triggered. ValueListener.getHandle() or the return value of - * ValueListenerPoller.add() can be used to map this to a specific added listener. - */ - public final int listener; - +public final class ValueNotification extends NotificationBase { /** Topic handle. Topic.getHandle() can be used to map this to the corresponding Topic object. */ public final int topic; @@ -45,17 +39,13 @@ public final class ValueNotification { int subentry, NetworkTableValue value, int flags) { - this.m_inst = inst; - this.listener = listener; + super(inst, listener); this.topic = topic; this.subentry = subentry; this.value = value; this.flags = flags; } - /* Network table instance. */ - private final NetworkTableInstance m_inst; - /* Cached topic object. */ Topic m_topicObject; diff --git a/ntcore/src/main/native/cpp/ConnectionList.cpp b/ntcore/src/main/native/cpp/ConnectionList.cpp index fffcb7d17e..75e1072c26 100644 --- a/ntcore/src/main/native/cpp/ConnectionList.cpp +++ b/ntcore/src/main/native/cpp/ConnectionList.cpp @@ -65,6 +65,8 @@ class ListenerThread final : public wpi::SafeThreadEvent { wpi::DenseMap> m_callbacks; + wpi::Event m_waitQueueWakeup; + wpi::Event m_waitQueueWaiter; }; class CLImpl { @@ -97,9 +99,10 @@ class CLImpl { void ListenerThread::Main() { while (m_active) { - WPI_Handle signaledBuf[2]; - auto signaled = - wpi::WaitForObjects({m_poller, m_stopEvent.GetHandle()}, signaledBuf); + WPI_Handle signaledBuf[3]; + auto signaled = wpi::WaitForObjects( + {m_poller, m_stopEvent.GetHandle(), m_waitQueueWakeup.GetHandle()}, + signaledBuf); if (signaled.empty() || !m_active) { return; } @@ -118,6 +121,10 @@ void ListenerThread::Main() { lock.lock(); } } + if (std::find(signaled.begin(), signaled.end(), + m_waitQueueWakeup.GetHandle()) != signaled.end()) { + m_waitQueueWaiter.Set(); + } } } @@ -279,12 +286,25 @@ bool ConnectionList::IsConnected() const { } NT_ConnectionListener ConnectionList::AddListener( - std::function callback, - bool immediateNotify) { + bool immediateNotify, + std::function callback) { std::scoped_lock lock{m_mutex}; return m_impl->AddListener(std::move(callback), immediateNotify); } +bool ConnectionList::WaitForListenerQueue(double timeout) { + std::scoped_lock lock{m_mutex}; + WPI_EventHandle h; + if (auto thr = m_impl->m_listenerThread.GetThread()) { + h = thr->m_waitQueueWaiter.GetHandle(); + thr->m_waitQueueWakeup.Set(); + } else { + return false; + } + bool timedOut; + return wpi::WaitForObject(h, timeout, &timedOut); +} + NT_ConnectionListenerPoller ConnectionList::CreateListenerPoller() { std::scoped_lock lock{m_mutex}; return m_impl->CreateListenerPoller()->handle; diff --git a/ntcore/src/main/native/cpp/ConnectionList.h b/ntcore/src/main/native/cpp/ConnectionList.h index ab52dabe68..b1602690cc 100644 --- a/ntcore/src/main/native/cpp/ConnectionList.h +++ b/ntcore/src/main/native/cpp/ConnectionList.h @@ -31,8 +31,9 @@ class ConnectionList final : public IConnectionList { bool IsConnected() const final; NT_ConnectionListener AddListener( - std::function callback, - bool immediateNotify); + bool immediateNotify, + std::function callback); + bool WaitForListenerQueue(double timeout); NT_ConnectionListenerPoller CreateListenerPoller(); void DestroyListenerPoller(NT_ConnectionListenerPoller pollerHandle); diff --git a/ntcore/src/main/native/cpp/LocalStorage.cpp b/ntcore/src/main/native/cpp/LocalStorage.cpp index 2c1162a3c9..01cd8f2e6c 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.cpp +++ b/ntcore/src/main/native/cpp/LocalStorage.cpp @@ -298,6 +298,8 @@ struct TopicListenerThread final : public wpi::SafeThreadEvent { wpi::DenseMap> m_callbacks; + wpi::Event m_waitQueueWakeup; + wpi::Event m_waitQueueWaiter; }; struct ValueListenerThread final : public wpi::SafeThreadEvent { @@ -312,6 +314,8 @@ struct ValueListenerThread final : public wpi::SafeThreadEvent { wpi::DenseMap> m_callbacks; + wpi::Event m_waitQueueWakeup; + wpi::Event m_waitQueueWaiter; }; struct LSImpl { @@ -531,9 +535,10 @@ void SubscriberData::UpdateActive() { void TopicListenerThread::Main() { while (m_active) { - WPI_Handle signaledBuf[2]; - auto signaled = - wpi::WaitForObjects({m_poller, m_stopEvent.GetHandle()}, signaledBuf); + WPI_Handle signaledBuf[3]; + auto signaled = wpi::WaitForObjects( + {m_poller, m_stopEvent.GetHandle(), m_waitQueueWakeup.GetHandle()}, + signaledBuf); if (signaled.empty() || !m_active) { break; } @@ -552,14 +557,19 @@ void TopicListenerThread::Main() { lock.lock(); } } + if (std::find(signaled.begin(), signaled.end(), + m_waitQueueWakeup.GetHandle()) != signaled.end()) { + m_waitQueueWaiter.Set(); + } } } void ValueListenerThread::Main() { while (m_active) { - WPI_Handle signaledBuf[2]; - auto signaled = - wpi::WaitForObjects({m_poller, m_stopEvent.GetHandle()}, signaledBuf); + WPI_Handle signaledBuf[3]; + auto signaled = wpi::WaitForObjects( + {m_poller, m_stopEvent.GetHandle(), m_waitQueueWakeup.GetHandle()}, + signaledBuf); if (signaled.empty() || !m_active) { break; } @@ -578,6 +588,10 @@ void ValueListenerThread::Main() { lock.lock(); } } + if (std::find(signaled.begin(), signaled.end(), + m_waitQueueWakeup.GetHandle()) != signaled.end()) { + m_waitQueueWaiter.Set(); + } } } @@ -2365,6 +2379,19 @@ NT_TopicListener LocalStorage::AddTopicListener( return m_impl->AddTopicListener(topicHandle, mask, std::move(callback)); } +bool LocalStorage::WaitForTopicListenerQueue(double timeout) { + std::scoped_lock lock{m_mutex}; + WPI_EventHandle h; + if (auto thr = m_impl->m_topicListenerThread.GetThread()) { + h = thr->m_waitQueueWaiter.GetHandle(); + thr->m_waitQueueWakeup.Set(); + } else { + return false; + } + bool timedOut; + return wpi::WaitForObject(h, timeout, &timedOut); +} + NT_TopicListenerPoller LocalStorage::CreateTopicListenerPoller() { std::scoped_lock lock{m_mutex}; return m_impl->AddTopicListenerPoller()->handle; @@ -2423,6 +2450,19 @@ NT_ValueListener LocalStorage::AddValueListener( return m_impl->AddValueListener(subentry, mask, std::move(callback)); } +bool LocalStorage::WaitForValueListenerQueue(double timeout) { + std::scoped_lock lock{m_mutex}; + WPI_EventHandle h; + if (auto thr = m_impl->m_valueListenerThread.GetThread()) { + h = thr->m_waitQueueWaiter.GetHandle(); + thr->m_waitQueueWakeup.Set(); + } else { + return false; + } + bool timedOut; + return wpi::WaitForObject(h, timeout, &timedOut); +} + NT_ValueListenerPoller LocalStorage::CreateValueListenerPoller() { std::scoped_lock lock{m_mutex}; return m_impl->AddValueListenerPoller()->handle; diff --git a/ntcore/src/main/native/cpp/LocalStorage.h b/ntcore/src/main/native/cpp/LocalStorage.h index a58fc5090c..0c8e0a678a 100644 --- a/ntcore/src/main/native/cpp/LocalStorage.h +++ b/ntcore/src/main/native/cpp/LocalStorage.h @@ -198,6 +198,7 @@ class LocalStorage final : public net::ILocalStorage { NT_TopicListener AddTopicListener( NT_Handle handle, unsigned int mask, std::function callback); + bool WaitForTopicListenerQueue(double timeout); NT_TopicListenerPoller CreateTopicListenerPoller(); void DestroyTopicListenerPoller(NT_TopicListenerPoller poller); @@ -220,6 +221,7 @@ class LocalStorage final : public net::ILocalStorage { NT_ValueListener AddValueListener( NT_Handle subentry, unsigned int mask, std::function callback); + bool WaitForValueListenerQueue(double timeout); NT_ValueListenerPoller CreateValueListenerPoller(); void DestroyValueListenerPoller(NT_ValueListenerPoller poller); diff --git a/ntcore/src/main/native/cpp/networktables/NetworkTableInstance.cpp b/ntcore/src/main/native/cpp/networktables/NetworkTableInstance.cpp index a82bf30039..79fb4d6540 100644 --- a/ntcore/src/main/native/cpp/networktables/NetworkTableInstance.cpp +++ b/ntcore/src/main/native/cpp/networktables/NetworkTableInstance.cpp @@ -15,6 +15,7 @@ #include "networktables/FloatTopic.h" #include "networktables/IntegerArrayTopic.h" #include "networktables/IntegerTopic.h" +#include "networktables/MultiSubscriber.h" #include "networktables/RawTopic.h" #include "networktables/StringArrayTopic.h" #include "networktables/StringTopic.h" @@ -100,8 +101,16 @@ void NetworkTableInstance::SetServer(std::span servers, SetServer(serversArr); } -NT_ConnectionListener NetworkTableInstance::AddConnectionListener( - std::function callback, - bool immediate_notify) const { - return ::nt::AddConnectionListener(m_handle, callback, immediate_notify); +NT_TopicListener NetworkTableInstance::AddTopicListener( + MultiSubscriber& subscriber, int eventMask, + std::function listener) { + return ::nt::AddTopicListener(subscriber.GetHandle(), eventMask, + std::move(listener)); +} + +NT_ValueListener NetworkTableInstance::AddValueListener( + MultiSubscriber& subscriber, unsigned int eventMask, + std::function listener) { + return ::nt::AddValueListener(subscriber.GetHandle(), eventMask, + std::move(listener)); } diff --git a/ntcore/src/main/native/cpp/ntcore_c.cpp b/ntcore/src/main/native/cpp/ntcore_c.cpp index 9edac2d461..7f28758332 100644 --- a/ntcore/src/main/native/cpp/ntcore_c.cpp +++ b/ntcore/src/main/native/cpp/ntcore_c.cpp @@ -423,6 +423,10 @@ NT_TopicListener NT_AddTopicListenerSingle(NT_Topic topic, unsigned int mask, }); } +NT_Bool NT_WaitForTopicListenerQueue(NT_Handle handle, double timeout) { + return nt::WaitForTopicListenerQueue(handle, timeout); +} + NT_TopicListenerPoller NT_CreateTopicListenerPoller(NT_Inst inst) { return nt::CreateTopicListenerPoller(inst); } @@ -477,6 +481,10 @@ NT_ValueListener NT_AddValueListener(NT_Handle subentry, unsigned int mask, }); } +NT_Bool NT_WaitForValueListenerQueue(NT_Handle handle, double timeout) { + return nt::WaitForValueListenerQueue(handle, timeout); +} + NT_ValueListenerPoller NT_CreateValueListenerPoller(NT_Inst inst) { return nt::CreateValueListenerPoller(inst); } @@ -502,17 +510,19 @@ void NT_RemoveValueListener(NT_ValueListener value_listener) { } NT_ConnectionListener NT_AddConnectionListener( - NT_Inst inst, void* data, NT_ConnectionListenerCallback callback, - NT_Bool immediate_notify) { - return nt::AddConnectionListener( - inst, - [=](const ConnectionNotification& event) { - NT_ConnectionNotification event_c; - ConvertToC(event, &event_c); - callback(data, &event_c); - DisposeConnectionNotification(&event_c); - }, - immediate_notify != 0); + NT_Inst inst, NT_Bool immediate_notify, void* data, + NT_ConnectionListenerCallback callback) { + return nt::AddConnectionListener(inst, immediate_notify != 0, + [=](const ConnectionNotification& event) { + NT_ConnectionNotification event_c; + ConvertToC(event, &event_c); + callback(data, &event_c); + DisposeConnectionNotification(&event_c); + }); +} + +NT_Bool NT_WaitForConnectionListenerQueue(NT_Handle handle, double timeout) { + return nt::WaitForConnectionListenerQueue(handle, timeout); } NT_ConnectionListenerPoller NT_CreateConnectionListenerPoller(NT_Inst inst) { diff --git a/ntcore/src/main/native/cpp/ntcore_cpp.cpp b/ntcore/src/main/native/cpp/ntcore_cpp.cpp index 54a10315e5..5f12f4269b 100644 --- a/ntcore/src/main/native/cpp/ntcore_cpp.cpp +++ b/ntcore/src/main/native/cpp/ntcore_cpp.cpp @@ -407,6 +407,14 @@ NT_TopicListener AddTopicListener( } } +bool WaitForTopicListenerQueue(NT_Handle handle, double timeout) { + if (auto ii = InstanceImpl::GetHandle(handle)) { + return ii->localStorage.WaitForTopicListenerQueue(timeout); + } else { + return {}; + } +} + NT_TopicListenerPoller CreateTopicListenerPoller(NT_Inst inst) { if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) { return ii->localStorage.CreateTopicListenerPoller(); @@ -466,6 +474,14 @@ NT_ValueListener AddValueListener( } } +bool WaitForValueListenerQueue(NT_Handle handle, double timeout) { + if (auto ii = InstanceImpl::GetHandle(handle)) { + return ii->localStorage.WaitForValueListenerQueue(timeout); + } else { + return {}; + } +} + NT_ValueListenerPoller CreateValueListenerPoller(NT_Inst inst) { if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) { return ii->localStorage.CreateValueListenerPoller(); @@ -505,12 +521,19 @@ void RemoveValueListener(NT_ValueListener listener) { } NT_ConnectionListener AddConnectionListener( - NT_Inst inst, - std::function callback, - bool immediate_notify) { + NT_Inst inst, bool immediate_notify, + std::function callback) { if (auto ii = InstanceImpl::GetTyped(inst, Handle::kInstance)) { - return ii->connectionList.AddListener(std::move(callback), - immediate_notify); + return ii->connectionList.AddListener(immediate_notify, + std::move(callback)); + } else { + return {}; + } +} + +bool WaitForConnectionListenerQueue(NT_Handle handle, double timeout) { + if (auto ii = InstanceImpl::GetHandle(handle)) { + return ii->connectionList.WaitForListenerQueue(timeout); } else { return {}; } diff --git a/ntcore/src/main/native/include/networktables/ConnectionListener.h b/ntcore/src/main/native/include/networktables/ConnectionListener.h index b1a76a59e8..9f0144ef23 100644 --- a/ntcore/src/main/native/include/networktables/ConnectionListener.h +++ b/ntcore/src/main/native/include/networktables/ConnectionListener.h @@ -28,7 +28,7 @@ class ConnectionListener final { * * @param inst Instance * @param immediateNotify if notification should be immediately created for - * existing connections + * existing connections * @param listener Listener function */ ConnectionListener( @@ -50,6 +50,18 @@ class ConnectionListener final { */ NT_ConnectionListener GetHandle() const { return m_handle; } + /** + * Wait for the connection listener queue to be empty. This is primarily + * useful for deterministic testing. This blocks until either the connection + * listener queue is empty (e.g. there are no more events that need to be + * passed along to callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or + * a negative value to block indefinitely + * @return False if timed out, otherwise true. + */ + bool WaitForQueue(double timeout); + private: NT_ConnectionListener m_handle{0}; }; diff --git a/ntcore/src/main/native/include/networktables/ConnectionListener.inc b/ntcore/src/main/native/include/networktables/ConnectionListener.inc index 3295a942a3..1e7395ce01 100644 --- a/ntcore/src/main/native/include/networktables/ConnectionListener.inc +++ b/ntcore/src/main/native/include/networktables/ConnectionListener.inc @@ -17,7 +17,7 @@ inline ConnectionListener::ConnectionListener( NetworkTableInstance inst, bool immediateNotify, std::function listener) : m_handle{ - AddConnectionListener(inst.GetHandle(), listener, immediateNotify)} {} + AddConnectionListener(inst.GetHandle(), immediateNotify, listener)} {} inline ConnectionListener::ConnectionListener(ConnectionListener&& rhs) : m_handle(rhs.m_handle) { @@ -36,6 +36,14 @@ inline ConnectionListener::~ConnectionListener() { } } +inline bool ConnectionListener::WaitForQueue(double timeout) { + if (m_handle != 0) { + return nt::WaitForConnectionListenerQueue(m_handle, timeout); + } else { + return false; + } +} + inline ConnectionListenerPoller::ConnectionListenerPoller( NetworkTableInstance inst) : m_handle(nt::CreateConnectionListenerPoller(inst.GetHandle())) {} diff --git a/ntcore/src/main/native/include/networktables/NetworkTableInstance.h b/ntcore/src/main/native/include/networktables/NetworkTableInstance.h index 3eb80f5668..83bd4c0923 100644 --- a/ntcore/src/main/native/include/networktables/NetworkTableInstance.h +++ b/ntcore/src/main/native/include/networktables/NetworkTableInstance.h @@ -27,9 +27,11 @@ class FloatArrayTopic; class FloatTopic; class IntegerArrayTopic; class IntegerTopic; +class MultiSubscriber; class RawTopic; class StringArrayTopic; class StringTopic; +class Subscriber; class Topic; /** @@ -359,15 +361,17 @@ class NetworkTableInstance final { */ /** - * Add a connection listener. + * Add a connection listener. The callback function is called asynchronously + * on a separate thread, so it's important to use synchronization or atomics + * when accessing any shared state from the callback function. * - * @param callback listener to add * @param immediate_notify notify listener of all existing connections + * @param callback listener to add * @return Listener handle */ NT_ConnectionListener AddConnectionListener( - std::function callback, - bool immediate_notify) const; + bool immediate_notify, + std::function callback) const; /** * Remove a connection listener. @@ -376,6 +380,190 @@ class NetworkTableInstance final { */ static void RemoveConnectionListener(NT_ConnectionListener conn_listener); + /** + * Wait for the connection listener queue to be empty. This is primarily + * useful for deterministic testing. This blocks until either the connection + * listener queue is empty (e.g. there are no more events that need to be + * passed along to callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or + * a negative value to block indefinitely + * @return False if timed out, otherwise true. + */ + bool WaitForConnectionListenerQueue(double timeout); + + /** @} */ + + /** + * @{ + * @name Topic Listener Functions + */ + + /** + * Add a topic listener for changes on a particular topic. The callback + * function is called asynchronously on a separate thread, so it's important + * to use synchronization or atomics when accessing any shared state from the + * callback function. + * + * @param topic Topic + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + NT_TopicListener AddTopicListener( + Topic topic, unsigned int eventMask, + std::function listener); + + /** + * Add a topic listener for topic changes on a subscriber. The callback + * function is called asynchronously on a separate thread, so it's important + * to use synchronization or atomics when accessing any shared state from the + * callback function. This does NOT keep the subscriber active. + * + * @param subscriber Subscriber + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + NT_TopicListener AddTopicListener( + Subscriber& subscriber, unsigned int eventMask, + std::function listener); + + /** + * Add a topic listener for topic changes on a subscriber. The callback + * function is called asynchronously on a separate thread, so it's important + * to use synchronization or atomics when accessing any shared state from the + * callback function. This does NOT keep the subscriber active. + * + * @param subscriber Subscriber + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + NT_TopicListener AddTopicListener( + MultiSubscriber& subscriber, int eventMask, + std::function listener); + + /** + * Add a topic listener for topic changes on an entry. The callback function + * is called asynchronously on a separate thread, so it's important to use + * synchronization or atomics when accessing any shared state from the + * callback function. + * + * @param entry Entry + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + NT_TopicListener AddTopicListener( + NetworkTableEntry& entry, int eventMask, + std::function listener); + + /** + * Add a topic listener for changes to topics with names that start with any + * of the given prefixes. The callback function is called asynchronously on a + * separate thread, so it's important to use synchronization or atomics when + * accessing any shared state from the callback function. + * + * @param prefixes Topic name string prefixes + * @param eventMask Bitmask of TopicListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + NT_TopicListener AddTopicListener( + std::span prefixes, int eventMask, + std::function listener); + + /** + * Remove a topic listener. + * + * @param listener Listener handle to remove + */ + static void RemoveTopicListener(NT_TopicListener listener); + + /** + * Wait for the topic listener queue to be empty. This is primarily useful for + * deterministic testing. This blocks until either the topic listener queue is + * empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or + * a negative value to block indefinitely + * @return False if timed out, otherwise true. + */ + bool WaitForTopicListenerQueue(double timeout); + + /** @} */ + + /** + * @{ + * @name Value Listener Functions + */ + + /** + * Add a value listener for value changes on a subscriber. The callback + * function is called asynchronously on a separate thread, so it's important + * to use synchronization or atomics when accessing any shared state from the + * callback function. This does NOT keep the subscriber active. + * + * @param subscriber Subscriber + * @param eventMask Bitmask of ValueListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + NT_ValueListener AddValueListener( + Subscriber& subscriber, unsigned int eventMask, + std::function listener); + + /** + * Add a value listener for value changes on a subscriber. The callback + * function is called asynchronously on a separate thread, so it's important + * to use synchronization or atomics when accessing any shared state from the + * callback function. This does NOT keep the subscriber active. + * + * @param subscriber Subscriber + * @param eventMask Bitmask of ValueListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + NT_ValueListener AddValueListener( + MultiSubscriber& subscriber, unsigned int eventMask, + std::function listener); + + /** + * Add a value listener for value changes on an entry. The callback function + * is called asynchronously on a separate thread, so it's important to use + * synchronization or atomics when accessing any shared state from the + * callback function. + * + * @param entry Entry + * @param eventMask Bitmask of ValueListenerFlags values + * @param listener Listener function + * @return Listener handle + */ + NT_ValueListener AddValueListener( + NetworkTableEntry& entry, int eventMask, + std::function listener); + + /** + * Remove a value listener. + * + * @param listener Listener handle to remove + */ + static void RemoveValueListener(NT_ValueListener listener); + + /** + * Wait for the value listener queue to be empty. This is primarily useful for + * deterministic testing. This blocks until either the value listener queue is + * empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or + * a negative value to block indefinitely + * @return False if timed out, otherwise true. + */ + bool WaitForValueListenerQueue(double timeout); + /** @} */ /** diff --git a/ntcore/src/main/native/include/networktables/NetworkTableInstance.inc b/ntcore/src/main/native/include/networktables/NetworkTableInstance.inc index bcc1a71e9c..7485da89ff 100644 --- a/ntcore/src/main/native/include/networktables/NetworkTableInstance.inc +++ b/ntcore/src/main/native/include/networktables/NetworkTableInstance.inc @@ -82,11 +82,83 @@ inline NetworkTableEntry NetworkTableInstance::GetEntry(std::string_view name) { return NetworkTableEntry{::nt::GetEntry(m_handle, name)}; } +inline NT_ConnectionListener NetworkTableInstance::AddConnectionListener( + bool immediate_notify, + std::function callback) const { + return ::nt::AddConnectionListener(m_handle, immediate_notify, + std::move(callback)); +} + +inline bool NetworkTableInstance::WaitForConnectionListenerQueue( + double timeout) { + return ::nt::WaitForConnectionListenerQueue(m_handle, timeout); +} + inline void NetworkTableInstance::RemoveConnectionListener( NT_ConnectionListener conn_listener) { ::nt::RemoveConnectionListener(conn_listener); } +inline NT_TopicListener NetworkTableInstance::AddTopicListener( + Topic topic, unsigned int eventMask, + std::function listener) { + return ::nt::AddTopicListener(topic.GetHandle(), eventMask, + std::move(listener)); +} + +inline NT_TopicListener NetworkTableInstance::AddTopicListener( + Subscriber& subscriber, unsigned int eventMask, + std::function listener) { + return ::nt::AddTopicListener(subscriber.GetHandle(), eventMask, + std::move(listener)); +} + +inline NT_TopicListener NetworkTableInstance::AddTopicListener( + NetworkTableEntry& entry, int eventMask, + std::function listener) { + return ::nt::AddTopicListener(entry.GetHandle(), eventMask, + std::move(listener)); +} + +inline NT_TopicListener NetworkTableInstance::AddTopicListener( + std::span prefixes, int eventMask, + std::function listener) { + return ::nt::AddTopicListener(m_handle, prefixes, eventMask, + std::move(listener)); +} + +inline void NetworkTableInstance::RemoveTopicListener( + NT_TopicListener listener) { + return ::nt::RemoveTopicListener(listener); +} + +inline bool NetworkTableInstance::WaitForTopicListenerQueue(double timeout) { + return ::nt::WaitForTopicListenerQueue(m_handle, timeout); +} + +inline NT_ValueListener NetworkTableInstance::AddValueListener( + Subscriber& subscriber, unsigned int eventMask, + std::function listener) { + return ::nt::AddValueListener(subscriber.GetHandle(), eventMask, + std::move(listener)); +} + +inline NT_ValueListener NetworkTableInstance::AddValueListener( + NetworkTableEntry& entry, int eventMask, + std::function listener) { + return ::nt::AddValueListener(entry.GetHandle(), eventMask, + std::move(listener)); +} + +inline void NetworkTableInstance::RemoveValueListener( + NT_ValueListener listener) { + ::nt::RemoveValueListener(listener); +} + +inline bool NetworkTableInstance::WaitForValueListenerQueue(double timeout) { + return ::nt::WaitForValueListenerQueue(m_handle, timeout); +} + inline unsigned int NetworkTableInstance::GetNetworkMode() const { return ::nt::GetNetworkMode(m_handle); } diff --git a/ntcore/src/main/native/include/networktables/TopicListener.h b/ntcore/src/main/native/include/networktables/TopicListener.h index df40afbe34..5cfc105b75 100644 --- a/ntcore/src/main/native/include/networktables/TopicListener.h +++ b/ntcore/src/main/native/include/networktables/TopicListener.h @@ -98,23 +98,25 @@ class TopicListener final { std::function listener); /** - * Create a listener for topic changes on a subscriber. + * Create a listener for topic changes on a subscriber. This does NOT keep the + * subscriber active. * * @param subscriber Subscriber * @param mask Bitmask of TopicListenerFlags values * @param listener Listener function */ - TopicListener(const Subscriber& subscriber, unsigned int mask, + TopicListener(Subscriber& subscriber, unsigned int mask, std::function listener); /** - * Create a listener for topic changes on a subscriber. + * Create a listener for topic changes on a subscriber. This does NOT keep the + * subscriber active. * * @param subscriber Subscriber * @param mask Bitmask of TopicListenerFlags values * @param listener Listener function */ - TopicListener(const MultiSubscriber& subscriber, unsigned int mask, + TopicListener(MultiSubscriber& subscriber, unsigned int mask, std::function listener); /** @@ -124,7 +126,7 @@ class TopicListener final { * @param mask Bitmask of TopicListenerFlags values * @param listener Listener function */ - TopicListener(const NetworkTableEntry& entry, unsigned int mask, + TopicListener(NetworkTableEntry& entry, unsigned int mask, std::function listener); TopicListener(const TopicListener&) = delete; @@ -142,6 +144,18 @@ class TopicListener final { */ NT_TopicListener GetHandle() const { return m_handle; } + /** + * Wait for the topic listener queue to be empty. This is primarily useful for + * deterministic testing. This blocks until either the topic listener queue is + * empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or + * a negative value to block indefinitely + * @return False if timed out, otherwise true. + */ + bool WaitForQueue(double timeout); + private: NT_TopicListener m_handle{0}; }; @@ -198,22 +212,24 @@ class TopicListenerPoller final { NT_TopicListener Add(Topic topic, unsigned int mask); /** - * Start listening to topic changes on a subscriber. + * Start listening to topic changes on a subscriber. This does NOT keep the + * subscriber active. * * @param subscriber Subscriber * @param mask Bitmask of TopicListenerFlags values * @return Listener handle */ - NT_TopicListener Add(const Subscriber& subscriber, unsigned int mask); + NT_TopicListener Add(Subscriber& subscriber, unsigned int mask); /** - * Start listening to topic changes on a subscriber. + * Start listening to topic changes on a subscriber. This does NOT keep the + * subscriber active. * * @param subscriber Subscriber * @param mask Bitmask of TopicListenerFlags values * @return Listener handle */ - NT_TopicListener Add(const MultiSubscriber& subscriber, unsigned int mask); + NT_TopicListener Add(MultiSubscriber& subscriber, unsigned int mask); /** * Start listening to topic changes on an entry. @@ -222,7 +238,7 @@ class TopicListenerPoller final { * @param mask Bitmask of TopicListenerFlags values * @return Listener handle */ - NT_TopicListener Add(const NetworkTableEntry& entry, unsigned int mask); + NT_TopicListener Add(NetworkTableEntry& entry, unsigned int mask); /** * Remove a listener. diff --git a/ntcore/src/main/native/include/networktables/TopicListener.inc b/ntcore/src/main/native/include/networktables/TopicListener.inc index 05fdd613c6..adb9e17851 100644 --- a/ntcore/src/main/native/include/networktables/TopicListener.inc +++ b/ntcore/src/main/native/include/networktables/TopicListener.inc @@ -29,17 +29,17 @@ inline TopicListener::TopicListener( : m_handle{AddTopicListener(topic.GetHandle(), mask, listener)} {} inline TopicListener::TopicListener( - const Subscriber& subscriber, unsigned int mask, + Subscriber& subscriber, unsigned int mask, std::function listener) : m_handle{AddTopicListener(subscriber.GetHandle(), mask, listener)} {} inline TopicListener::TopicListener( - const MultiSubscriber& subscriber, unsigned int mask, + MultiSubscriber& subscriber, unsigned int mask, std::function listener) : m_handle{AddTopicListener(subscriber.GetHandle(), mask, listener)} {} inline TopicListener::TopicListener( - const NetworkTableEntry& entry, unsigned int mask, + NetworkTableEntry& entry, unsigned int mask, std::function listener) : m_handle{AddTopicListener(entry.GetHandle(), mask, listener)} {} @@ -59,6 +59,14 @@ inline TopicListener::~TopicListener() { } } +inline bool TopicListener::WaitForQueue(double timeout) { + if (m_handle != 0) { + return nt::WaitForTopicListenerQueue(m_handle, timeout); + } else { + return false; + } +} + inline TopicListenerPoller::TopicListenerPoller(NetworkTableInstance inst) : m_handle(nt::CreateTopicListenerPoller(inst.GetHandle())) {} @@ -89,17 +97,17 @@ inline NT_TopicListener TopicListenerPoller::Add(Topic topic, return nt::AddPolledTopicListener(m_handle, topic.GetHandle(), mask); } -inline NT_TopicListener TopicListenerPoller::Add(const Subscriber& subscriber, +inline NT_TopicListener TopicListenerPoller::Add(Subscriber& subscriber, unsigned int mask) { return nt::AddPolledTopicListener(m_handle, subscriber.GetHandle(), mask); } -inline NT_TopicListener TopicListenerPoller::Add( - const MultiSubscriber& subscriber, unsigned int mask) { +inline NT_TopicListener TopicListenerPoller::Add(MultiSubscriber& subscriber, + unsigned int mask) { return nt::AddPolledTopicListener(m_handle, subscriber.GetHandle(), mask); } -inline NT_TopicListener TopicListenerPoller::Add(const NetworkTableEntry& entry, +inline NT_TopicListener TopicListenerPoller::Add(NetworkTableEntry& entry, unsigned int mask) { return nt::AddPolledTopicListener(m_handle, entry.GetHandle(), mask); } diff --git a/ntcore/src/main/native/include/networktables/ValueListener.h b/ntcore/src/main/native/include/networktables/ValueListener.h index 02baa4eeaa..9c85bc7e0d 100644 --- a/ntcore/src/main/native/include/networktables/ValueListener.h +++ b/ntcore/src/main/native/include/networktables/ValueListener.h @@ -57,23 +57,25 @@ class ValueListener final { ValueListener() = default; /** - * Create a listener for value changes on a subscriber. + * Create a listener for value changes on a subscriber. This does NOT keep the + * subscriber active. * * @param subscriber Subscriber * @param mask Bitmask of ValueListenerFlags values * @param listener Listener function */ - ValueListener(const Subscriber& subscriber, unsigned int mask, + ValueListener(Subscriber& subscriber, unsigned int mask, std::function listener); /** - * Create a listener for value changes on a subscriber. + * Create a listener for value changes on a subscriber. This does NOT keep the + * subscriber active. * * @param subscriber Subscriber * @param mask Bitmask of ValueListenerFlags values * @param listener Listener function */ - ValueListener(const MultiSubscriber& subscriber, unsigned int mask, + ValueListener(MultiSubscriber& subscriber, unsigned int mask, std::function listener); /** @@ -83,17 +85,7 @@ class ValueListener final { * @param mask Bitmask of ValueListenerFlags values * @param listener Listener function */ - ValueListener(const NetworkTableEntry& entry, unsigned int mask, - std::function listener); - - /** - * Create a listener for value changes on a subscriber/entry handle. - * - * @param subentry Subscriber/entry handle - * @param mask Bitmask of ValueListenerFlags values - * @param listener Listener function - */ - ValueListener(NT_Handle subentry, unsigned int mask, + ValueListener(NetworkTableEntry& entry, unsigned int mask, std::function listener); ValueListener(const ValueListener&) = delete; @@ -111,6 +103,18 @@ class ValueListener final { */ NT_ValueListener GetHandle() const { return m_handle; } + /** + * Wait for the value listener queue to be empty. This is primarily useful for + * deterministic testing. This blocks until either the value listener queue is + * empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or + * a negative value to block indefinitely + * @return False if timed out, otherwise true. + */ + bool WaitForQueue(double timeout); + private: NT_ValueListener m_handle{0}; }; @@ -147,22 +151,24 @@ class ValueListenerPoller final { NT_ValueListenerPoller GetHandle() const { return m_handle; } /** - * Start listening to value changes on a subscriber. + * Start listening to value changes on a subscriber. This does NOT keep the + * subscriber active. * * @param subscriber Subscriber * @param mask Bitmask of ValueListenerFlags values * @return Listener handle */ - NT_ValueListener Add(const Subscriber& subscriber, unsigned int mask); + NT_ValueListener Add(Subscriber& subscriber, unsigned int mask); /** - * Start listening to value changes on a subscriber. + * Start listening to value changes on a subscriber. This does NOT keep the + * subscriber active. * * @param subscriber Subscriber * @param mask Bitmask of ValueListenerFlags values * @return Listener handle */ - NT_ValueListener Add(const MultiSubscriber& subscriber, unsigned int mask); + NT_ValueListener Add(MultiSubscriber& subscriber, unsigned int mask); /** * Start listening to value changes on an entry. @@ -171,16 +177,7 @@ class ValueListenerPoller final { * @param mask Bitmask of ValueListenerFlags values * @return Listener handle */ - NT_ValueListener Add(const NetworkTableEntry& entry, unsigned int mask); - - /** - * Start listening to value changes on a subscriber/entry handle. - * - * @param subentry Subscriber/entry handle - * @param mask Bitmask of ValueListenerFlags values - * @return Listener handle - */ - NT_ValueListener Add(NT_Handle subentry, unsigned int mask); + NT_ValueListener Add(NetworkTableEntry& entry, unsigned int mask); /** * Remove a listener. diff --git a/ntcore/src/main/native/include/networktables/ValueListener.inc b/ntcore/src/main/native/include/networktables/ValueListener.inc index 6872468e00..4950ba1c26 100644 --- a/ntcore/src/main/native/include/networktables/ValueListener.inc +++ b/ntcore/src/main/native/include/networktables/ValueListener.inc @@ -17,25 +17,20 @@ namespace nt { inline ValueListener::ValueListener( - const Subscriber& subscriber, unsigned int mask, + Subscriber& subscriber, unsigned int mask, std::function listener) : m_handle{AddValueListener(subscriber.GetHandle(), mask, listener)} {} inline ValueListener::ValueListener( - const MultiSubscriber& subscriber, unsigned int mask, + MultiSubscriber& subscriber, unsigned int mask, std::function listener) : m_handle{AddValueListener(subscriber.GetHandle(), mask, listener)} {} inline ValueListener::ValueListener( - const NetworkTableEntry& entry, unsigned int mask, + NetworkTableEntry& entry, unsigned int mask, std::function listener) : m_handle{AddValueListener(entry.GetHandle(), mask, listener)} {} -inline ValueListener::ValueListener( - NT_Handle subentry, unsigned int mask, - std::function listener) - : m_handle{AddValueListener(subentry, mask, listener)} {} - inline ValueListener::ValueListener(ValueListener&& rhs) : m_handle(rhs.m_handle) { rhs.m_handle = 0; @@ -52,6 +47,14 @@ inline ValueListener::~ValueListener() { } } +inline bool ValueListener::WaitForQueue(double timeout) { + if (m_handle != 0) { + return nt::WaitForValueListenerQueue(m_handle, timeout); + } else { + return false; + } +} + inline ValueListenerPoller::ValueListenerPoller(NetworkTableInstance inst) : m_handle(nt::CreateValueListenerPoller(inst.GetHandle())) {} @@ -72,24 +75,19 @@ inline ValueListenerPoller::~ValueListenerPoller() { } } -inline NT_ValueListener ValueListenerPoller::Add(const Subscriber& subscriber, +inline NT_ValueListener ValueListenerPoller::Add(Subscriber& subscriber, unsigned int mask) { - return Add(subscriber.GetHandle(), mask); + return nt::AddPolledValueListener(m_handle, subscriber.GetHandle(), mask); } -inline NT_ValueListener ValueListenerPoller::Add( - const MultiSubscriber& subscriber, unsigned int mask) { - return Add(subscriber.GetHandle(), mask); +inline NT_ValueListener ValueListenerPoller::Add(MultiSubscriber& subscriber, + unsigned int mask) { + return nt::AddPolledValueListener(m_handle, subscriber.GetHandle(), mask); } -inline NT_ValueListener ValueListenerPoller::Add(const NetworkTableEntry& entry, +inline NT_ValueListener ValueListenerPoller::Add(NetworkTableEntry& entry, unsigned int mask) { - return Add(entry.GetHandle(), mask); -} - -inline NT_ValueListener ValueListenerPoller::Add(NT_Handle subentry, - unsigned int mask) { - return nt::AddPolledValueListener(m_handle, subentry, mask); + return nt::AddPolledValueListener(m_handle, entry.GetHandle(), mask); } inline void ValueListenerPoller::Remove(NT_ValueListener listener) { diff --git a/ntcore/src/main/native/include/ntcore_c.h b/ntcore/src/main/native/include/ntcore_c.h index 07c1719199..91e83c2262 100644 --- a/ntcore/src/main/native/include/ntcore_c.h +++ b/ntcore/src/main/native/include/ntcore_c.h @@ -862,6 +862,19 @@ NT_TopicListener NT_AddTopicListenerSingle(NT_Topic topic, unsigned int mask, void* data, NT_TopicListenerCallback callback); +/** + * Wait for the topic listener queue to be empty. This is primarily useful + * for deterministic testing. This blocks until either the topic listener + * queue is empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param handle handle + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a + * negative value to block indefinitely + * @return False if timed out, otherwise true. + */ +NT_Bool NT_WaitForTopicListenerQueue(NT_Handle handle, double timeout); + /** * Creates a topic listener poller. * @@ -971,6 +984,19 @@ NT_ValueListener NT_AddValueListener(NT_Handle subentry, unsigned int mask, void* data, NT_ValueListenerCallback callback); +/** + * Wait for the value listener queue to be empty. This is primarily useful + * for deterministic testing. This blocks until either the value listener + * queue is empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param handle handle + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a + * negative value to block indefinitely + * @return False if timed out, otherwise true. + */ +NT_Bool NT_WaitForValueListenerQueue(NT_Handle handle, double timeout); + /** * Create a value listener poller. * @@ -1050,8 +1076,21 @@ typedef void (*NT_ConnectionListenerCallback)( * @return Listener handle */ NT_ConnectionListener NT_AddConnectionListener( - NT_Inst inst, void* data, NT_ConnectionListenerCallback callback, - NT_Bool immediate_notify); + NT_Inst inst, NT_Bool immediate_notify, void* data, + NT_ConnectionListenerCallback callback); + +/** + * Wait for the connection listener queue to be empty. This is primarily useful + * for deterministic testing. This blocks until either the connection listener + * queue is empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param handle handle + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a + * negative value to block indefinitely + * @return False if timed out, otherwise true. + */ +NT_Bool NT_WaitForConnectionListenerQueue(NT_Handle handle, double timeout); /** * Create a connection listener poller. diff --git a/ntcore/src/main/native/include/ntcore_cpp.h b/ntcore/src/main/native/include/ntcore_cpp.h index abaf64a870..c5df2243f4 100644 --- a/ntcore/src/main/native/include/ntcore_cpp.h +++ b/ntcore/src/main/native/include/ntcore_cpp.h @@ -799,6 +799,19 @@ NT_TopicListener AddTopicListener( NT_Handle handle, unsigned int mask, std::function callback); +/** + * Wait for the topic listener queue to be empty. This is primarily useful + * for deterministic testing. This blocks until either the topic listener + * queue is empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param handle handle + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a + * negative value to block indefinitely + * @return False if timed out, otherwise true. + */ +bool WaitForTopicListenerQueue(NT_Handle handle, double timeout); + /** * Creates a topic listener poller. * @@ -880,6 +893,19 @@ NT_ValueListener AddValueListener( NT_Handle subentry, unsigned int mask, std::function callback); +/** + * Wait for the value listener queue to be empty. This is primarily useful + * for deterministic testing. This blocks until either the value listener + * queue is empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param handle handle + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a + * negative value to block indefinitely + * @return False if timed out, otherwise true. + */ +bool WaitForValueListenerQueue(NT_Handle handle, double timeout); + /** * Create a value listener poller. * @@ -945,9 +971,21 @@ void RemoveValueListener(NT_ValueListener listener); * @return Listener handle */ NT_ConnectionListener AddConnectionListener( - NT_Inst inst, - std::function callback, - bool immediate_notify); + NT_Inst inst, bool immediate_notify, + std::function callback); + +/** + * Wait for the connection listener queue to be empty. This is primarily useful + * for deterministic testing. This blocks until either the connection listener + * queue is empty (e.g. there are no more events that need to be passed along to + * callbacks or poll queues) or the timeout expires. + * + * @param handle handle + * @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a + * negative value to block indefinitely + * @return False if timed out, otherwise true. + */ +bool WaitForConnectionListenerQueue(NT_Handle handle, double timeout); /** * Create a connection listener poller. diff --git a/ntcore/src/test/java/edu/wpi/first/networktables/ConnectionListenerTest.java b/ntcore/src/test/java/edu/wpi/first/networktables/ConnectionListenerTest.java index cc744a2467..b3ce5909c3 100644 --- a/ntcore/src/test/java/edu/wpi/first/networktables/ConnectionListenerTest.java +++ b/ntcore/src/test/java/edu/wpi/first/networktables/ConnectionListenerTest.java @@ -114,12 +114,12 @@ class ConnectionListenerTest { List events = new ArrayList<>(); final int handle = m_serverInst.addConnectionListener( + false, e -> { synchronized (events) { events.add(e); } - }, - false); + }); // trigger a connect event m_clientInst.startClient4("client"); @@ -146,6 +146,9 @@ class ConnectionListenerTest { fail("interrupted while waiting for queue"); } + // wait for thread + m_serverInst.waitForConnectionListenerQueue(1.0); + // get the event synchronized (events) { assertEquals(1, events.size()); @@ -162,6 +165,9 @@ class ConnectionListenerTest { fail("interrupted while waiting for client to stop"); } + // wait for thread + m_serverInst.waitForConnectionListenerQueue(1.0); + // get the event try { assertFalse(WPIUtilJNI.waitForObjectTimeout(handle, 1.0)); diff --git a/ntcore/src/test/native/cpp/ConnectionListenerTest.cpp b/ntcore/src/test/native/cpp/ConnectionListenerTest.cpp index 8ef166f070..4c3624bbef 100644 --- a/ntcore/src/test/native/cpp/ConnectionListenerTest.cpp +++ b/ntcore/src/test/native/cpp/ConnectionListenerTest.cpp @@ -89,12 +89,10 @@ TEST_P(ConnectionListenerVariantTest, Threaded) { wpi::mutex m; std::vector result; auto handle = nt::AddConnectionListener( - server_inst, - [&](const nt::ConnectionNotification& event) { + server_inst, false, [&](const nt::ConnectionNotification& event) { std::scoped_lock lock{m}; result.push_back(event); - }, - false); + }); // trigger a connect event Connect(GetParam().first, 0, 20001 + GetParam().second); @@ -116,6 +114,9 @@ TEST_P(ConnectionListenerVariantTest, Threaded) { nt::StopClient(client_inst); std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // wait for thread + nt::WaitForConnectionListenerQueue(server_inst, 1.0); + // get the event { std::scoped_lock lock{m}; diff --git a/ntcoreffi/src/main/native/symbols.txt b/ntcoreffi/src/main/native/symbols.txt index c79c5a3856..90f05f5c93 100644 --- a/ntcoreffi/src/main/native/symbols.txt +++ b/ntcoreffi/src/main/native/symbols.txt @@ -220,6 +220,9 @@ NT_StopServer NT_Subscribe NT_Unpublish NT_Unsubscribe +NT_WaitForConnectionListenerQueue +NT_WaitForTopicListenerQueue +NT_WaitForValueListenerQueue WPI_CreateEvent WPI_CreateSemaphore WPI_CreateSignalObject diff --git a/wpilibc/src/main/native/cpp/Preferences.cpp b/wpilibc/src/main/native/cpp/Preferences.cpp index a76ebfe46a..91b8cffb77 100644 --- a/wpilibc/src/main/native/cpp/Preferences.cpp +++ b/wpilibc/src/main/native/cpp/Preferences.cpp @@ -27,7 +27,7 @@ struct Instance { nt::NetworkTableInstance::GetDefault().GetTable(kTableName)}; nt::StringPublisher typePublisher{table->GetStringTopic(".type").Publish()}; nt::MultiSubscriber tableSubscriber{nt::NetworkTableInstance::GetDefault(), - {{kTableName}}}; + {{fmt::format("{}/", table->GetPath())}}}; nt::TopicListener listener; }; } // namespace @@ -167,11 +167,12 @@ void Preferences::RemoveAll() { Instance::Instance() { typePublisher.Set("RobotPreferences"); listener = nt::TopicListener{ - table->GetInstance(), - {{std::string_view{fmt::format("{}/", table->GetPath())}}}, - NT_TOPIC_NOTIFY_IMMEDIATE | NT_TOPIC_NOTIFY_PUBLISH, - [](const nt::TopicNotification& event) { - nt::SetTopicPersistent(event.info.topic, true); + tableSubscriber, NT_TOPIC_NOTIFY_IMMEDIATE | NT_TOPIC_NOTIFY_PUBLISH, + [typeTopic = typePublisher.GetTopic().GetHandle()]( + const nt::TopicNotification& event) { + if (event.info.topic != typeTopic) { + nt::SetTopicPersistent(event.info.topic, true); + } }}; HAL_Report(HALUsageReporting::kResourceType_Preferences, 0); } diff --git a/wpilibj/src/main/java/edu/wpi/first/wpilibj/Preferences.java b/wpilibj/src/main/java/edu/wpi/first/wpilibj/Preferences.java index 6b9a952705..f4f667431f 100644 --- a/wpilibj/src/main/java/edu/wpi/first/wpilibj/Preferences.java +++ b/wpilibj/src/main/java/edu/wpi/first/wpilibj/Preferences.java @@ -13,6 +13,7 @@ import edu.wpi.first.networktables.NetworkTable; import edu.wpi.first.networktables.NetworkTableEntry; import edu.wpi.first.networktables.NetworkTableInstance; import edu.wpi.first.networktables.StringPublisher; +import edu.wpi.first.networktables.Topic; import edu.wpi.first.networktables.TopicListener; import edu.wpi.first.networktables.TopicListenerFlags; import java.util.Collection; @@ -75,10 +76,14 @@ public final class Preferences { } m_listener = new TopicListener( - m_table.getInstance(), - new String[] {m_table.getPath() + "/"}, + m_tableSubscriber, TopicListenerFlags.kImmediate | TopicListenerFlags.kPublish, - event -> event.info.getTopic().setPersistent(true)); + event -> { + Topic topic = event.info.getTopic(); + if (!topic.equals(m_typePublisher.getTopic())) { + event.info.getTopic().setPersistent(true); + } + }); } /**