[ntcore] Unify listeners (#4536)

This combines all 4 NT listener APIs (topic, value, connection, and
logging) into a single unified listener API.
This commit is contained in:
Peter Johnson
2022-10-31 21:52:14 -07:00
committed by GitHub
parent 32fbfb7da6
commit b114006543
71 changed files with 3613 additions and 5786 deletions

View File

@@ -66,9 +66,7 @@ public final class NetworkTableInstance implements AutoCloseable {
@Override
public synchronized void close() {
if (m_owned && m_handle != 0) {
m_connectionListener.close();
m_topicListener.close();
m_valueListener.close();
m_listeners.close();
NetworkTablesJNI.destroyInstance(m_handle);
}
}
@@ -357,33 +355,84 @@ public final class NetworkTableInstance implements AutoCloseable {
* Callback Creation Functions
*/
private abstract static class ListenerBase<T extends NotificationBase> implements AutoCloseable {
protected final ReentrantLock m_lock = new ReentrantLock();
protected final Map<Integer, Consumer<T>> m_listeners = new HashMap<>();
private static class ListenerStorage implements AutoCloseable {
private final ReentrantLock m_lock = new ReentrantLock();
private final Map<Integer, Consumer<NetworkTableEvent>> m_listeners = new HashMap<>();
private Thread m_thread;
protected int m_poller;
private int m_poller;
private boolean m_waitQueue;
private final Event m_waitQueueEvent = new Event();
private final Condition m_waitQueueCond = m_lock.newCondition();
protected final NetworkTableInstance m_inst;
private final NetworkTableInstance m_inst;
ListenerBase(NetworkTableInstance inst) {
ListenerStorage(NetworkTableInstance inst) {
m_inst = inst;
}
int add(String[] prefixes, int mask, Consumer<NetworkTableEvent> listener) {
m_lock.lock();
try {
if (m_poller == 0) {
m_poller = NetworkTablesJNI.createListenerPoller(m_inst.getHandle());
startThread();
}
int h = NetworkTablesJNI.addListener(m_poller, prefixes, mask);
m_listeners.put(h, listener);
return h;
} finally {
m_lock.unlock();
}
}
int add(int handle, int mask, Consumer<NetworkTableEvent> listener) {
m_lock.lock();
try {
if (m_poller == 0) {
m_poller = NetworkTablesJNI.createListenerPoller(m_inst.getHandle());
startThread();
}
int h = NetworkTablesJNI.addListener(m_poller, handle, mask);
m_listeners.put(h, listener);
return h;
} finally {
m_lock.unlock();
}
}
int addLogger(int minLevel, int maxLevel, Consumer<NetworkTableEvent> listener) {
m_lock.lock();
try {
if (m_poller == 0) {
m_poller = NetworkTablesJNI.createListenerPoller(m_inst.getHandle());
startThread();
}
int h = NetworkTablesJNI.addLogger(m_poller, minLevel, maxLevel);
m_listeners.put(h, listener);
return h;
} finally {
m_lock.unlock();
}
}
void remove(int listener) {
m_lock.lock();
try {
m_listeners.remove(listener);
} finally {
m_lock.unlock();
}
NetworkTablesJNI.removeListener(listener);
}
@Override
public void close() {
if (m_poller != 0) {
destroyPoller();
NetworkTablesJNI.destroyListenerPoller(m_poller);
}
m_poller = 0;
}
protected abstract T[] readQueue();
protected abstract void destroyPoller();
protected void startThread(String name) {
private void startThread() {
m_thread =
new Thread(
() -> {
@@ -407,8 +456,9 @@ public final class NetworkTableInstance implements AutoCloseable {
wasInterrupted = true;
break;
}
for (T event : readQueue()) {
Consumer<T> listener;
for (NetworkTableEvent event :
NetworkTablesJNI.readListenerQueue(m_inst, m_poller)) {
Consumer<NetworkTableEvent> listener;
m_lock.lock();
try {
listener = m_listeners.get(event.listener);
@@ -439,14 +489,14 @@ public final class NetworkTableInstance implements AutoCloseable {
m_lock.lock();
try {
if (!wasInterrupted) {
destroyPoller();
NetworkTablesJNI.destroyListenerPoller(m_poller);
}
m_poller = 0;
} finally {
m_lock.unlock();
}
},
name);
"NTListener");
m_thread.setDaemon(true);
m_thread.start();
}
@@ -477,48 +527,29 @@ public final class NetworkTableInstance implements AutoCloseable {
}
}
private static final class ConnectionListener extends ListenerBase<ConnectionNotification> {
ConnectionListener(NetworkTableInstance inst) {
super(inst);
}
private ListenerStorage m_listeners = new ListenerStorage(this);
int add(boolean immediateNotify, Consumer<ConnectionNotification> listener) {
m_lock.lock();
try {
if (m_poller == 0) {
m_poller = NetworkTablesJNI.createConnectionListenerPoller(m_inst.getHandle());
startThread("NTConnectionListener");
}
int h = NetworkTablesJNI.addPolledConnectionListener(m_poller, immediateNotify);
m_listeners.put(h, listener);
return h;
} finally {
m_lock.unlock();
}
}
void remove(int listener) {
m_lock.lock();
try {
m_listeners.remove(listener);
} finally {
m_lock.unlock();
}
NetworkTablesJNI.removeConnectionListener(listener);
}
@Override
protected ConnectionNotification[] readQueue() {
return NetworkTablesJNI.readConnectionListenerQueue(m_inst, m_poller);
}
@Override
protected void destroyPoller() {
NetworkTablesJNI.destroyConnectionListenerPoller(m_poller);
}
/**
* Remove a connection listener.
*
* @param listener Listener handle to remove
*/
public void removeListener(int listener) {
m_listeners.remove(listener);
}
private ConnectionListener m_connectionListener = new ConnectionListener(this);
/**
* Wait for the listener queue to be empty. This is primarily useful for deterministic
* testing. This blocks until either the listener queue is empty (e.g. there are no
* more events that need to be passed along to callbacks or poll queues) or the timeout expires.
*
* @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to
* block indefinitely
* @return False if timed out, otherwise true.
*/
public boolean waitForListenerQueue(double timeout) {
return m_listeners.waitForQueue(timeout);
}
/**
* Add a connection listener. The callback function is called asynchronously on a separate
@@ -530,110 +561,35 @@ public final class NetworkTableInstance implements AutoCloseable {
* @return Listener handle
*/
public int addConnectionListener(
boolean immediateNotify, Consumer<ConnectionNotification> listener) {
return m_connectionListener.add(immediateNotify, listener);
boolean immediateNotify, Consumer<NetworkTableEvent> listener) {
return m_listeners.add(m_handle,
NetworkTableEvent.kConnection | (immediateNotify ? NetworkTableEvent.kImmediate : 0),
listener);
}
/**
* Remove a connection listener.
*
* @param listener Listener handle to remove
*/
public void removeConnectionListener(int listener) {
m_connectionListener.remove(listener);
}
/**
* Wait for the connection listener queue to be empty. This is primarily useful for deterministic
* testing. This blocks until either the connection listener queue is empty (e.g. there are no
* more events that need to be passed along to callbacks or poll queues) or the timeout expires.
*
* @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to
* block indefinitely
* @return False if timed out, otherwise true.
*/
public boolean waitForConnectionListenerQueue(double timeout) {
return m_connectionListener.waitForQueue(timeout);
}
private static final class TopicListener extends ListenerBase<TopicNotification> {
TopicListener(NetworkTableInstance inst) {
super(inst);
}
int add(int handle, int eventMask, Consumer<TopicNotification> listener) {
m_lock.lock();
try {
if (m_poller == 0) {
m_poller = NetworkTablesJNI.createTopicListenerPoller(m_inst.getHandle());
startThread("NTTopicListener");
}
int h = NetworkTablesJNI.addPolledTopicListener(m_poller, handle, eventMask);
m_listeners.put(h, listener);
return h;
} finally {
m_lock.unlock();
}
}
int add(String[] prefixes, int eventMask, Consumer<TopicNotification> listener) {
m_lock.lock();
try {
if (m_poller == 0) {
m_poller = NetworkTablesJNI.createTopicListenerPoller(m_inst.getHandle());
startThread("NTTopicListener");
}
int h = NetworkTablesJNI.addPolledTopicListener(m_poller, prefixes, eventMask);
m_listeners.put(h, listener);
return h;
} finally {
m_lock.unlock();
}
}
void remove(int listener) {
m_lock.lock();
try {
m_listeners.remove(listener);
} finally {
m_lock.unlock();
}
NetworkTablesJNI.removeTopicListener(listener);
}
@Override
protected TopicNotification[] readQueue() {
return NetworkTablesJNI.readTopicListenerQueue(m_inst, m_poller);
}
@Override
protected void destroyPoller() {
NetworkTablesJNI.destroyTopicListenerPoller(m_poller);
}
}
private TopicListener m_topicListener = new TopicListener(this);
/**
* Add a topic listener for changes on a particular topic. The callback function is called
* Add a listener for changes on a particular topic. The callback function is called
* asynchronously on a separate thread, so it's important to use synchronization or atomics when
* accessing any shared state from the callback function.
*
* <p>This creates a corresponding internal subscriber with the lifetime of the
* listener.
*
* @param topic Topic
* @param eventMask Bitmask of TopicListenerFlags values
* @param listener Listener function
* @return Listener handle
*/
public int addTopicListener(
Topic topic, int eventMask, Consumer<TopicNotification> listener) {
public int addListener(
Topic topic, int eventMask, Consumer<NetworkTableEvent> listener) {
if (topic.getInstance().getHandle() != m_handle) {
throw new IllegalArgumentException("topic is not from this instance");
}
return m_topicListener.add(topic.getHandle(), eventMask, listener);
return m_listeners.add(topic.getHandle(), eventMask, listener);
}
/**
* Add a topic listener for topic changes on a subscriber. The callback function is called
* Add a listener for changes on a subscriber. The callback function is called
* asynchronously on a separate thread, so it's important to use synchronization or atomics when
* accessing any shared state from the callback function. This does NOT keep the subscriber
* active.
@@ -643,16 +599,16 @@ public final class NetworkTableInstance implements AutoCloseable {
* @param listener Listener function
* @return Listener handle
*/
public int addTopicListener(
Subscriber subscriber, int eventMask, Consumer<TopicNotification> listener) {
public int addListener(
Subscriber subscriber, int eventMask, Consumer<NetworkTableEvent> listener) {
if (subscriber.getTopic().getInstance().getHandle() != m_handle) {
throw new IllegalArgumentException("subscriber is not from this instance");
}
return m_topicListener.add(subscriber.getHandle(), eventMask, listener);
return m_listeners.add(subscriber.getHandle(), eventMask, listener);
}
/**
* Add a topic listener for topic changes on a subscriber. The callback function is called
* Add a listener for changes on a subscriber. The callback function is called
* asynchronously on a separate thread, so it's important to use synchronization or atomics when
* accessing any shared state from the callback function. This does NOT keep the subscriber
* active.
@@ -662,16 +618,16 @@ public final class NetworkTableInstance implements AutoCloseable {
* @param listener Listener function
* @return Listener handle
*/
public int addTopicListener(
MultiSubscriber subscriber, int eventMask, Consumer<TopicNotification> listener) {
public int addListener(
MultiSubscriber subscriber, int eventMask, Consumer<NetworkTableEvent> listener) {
if (subscriber.getInstance().getHandle() != m_handle) {
throw new IllegalArgumentException("subscriber is not from this instance");
}
return m_topicListener.add(subscriber.getHandle(), eventMask, listener);
return m_listeners.add(subscriber.getHandle(), eventMask, listener);
}
/**
* Add a topic listener for topic changes on an entry. The callback function is called
* Add a listener for changes on an entry. The callback function is called
* asynchronously on a separate thread, so it's important to use synchronization or atomics when
* accessing any shared state from the callback function.
*
@@ -680,173 +636,33 @@ public final class NetworkTableInstance implements AutoCloseable {
* @param listener Listener function
* @return Listener handle
*/
public int addTopicListener(
NetworkTableEntry entry, int eventMask, Consumer<TopicNotification> listener) {
public int addListener(
NetworkTableEntry entry, int eventMask, Consumer<NetworkTableEvent> listener) {
if (entry.getTopic().getInstance().getHandle() != m_handle) {
throw new IllegalArgumentException("entry is not from this instance");
}
return m_topicListener.add(entry.getHandle(), eventMask, listener);
return m_listeners.add(entry.getHandle(), eventMask, listener);
}
/**
* Add a topic listener for changes to topics with names that start with any of the given
* Add a listener for changes to topics with names that start with any of the given
* prefixes. The callback function is called asynchronously on a separate thread, so it's
* important to use synchronization or atomics when accessing any shared state from the callback
* function.
*
* <p>This creates a corresponding internal subscriber with the lifetime of the
* listener.
*
* @param prefixes Topic name string prefixes
* @param eventMask Bitmask of TopicListenerFlags values
* @param listener Listener function
* @return Listener handle
*/
public int addTopicListener(
public int addListener(
String[] prefixes,
int eventMask,
Consumer<TopicNotification> listener) {
return m_topicListener.add(prefixes, eventMask, listener);
}
/**
* Remove a topic listener.
*
* @param listener Listener handle to remove
*/
public void removeTopicListener(int listener) {
m_topicListener.remove(listener);
}
/**
* Wait for the topic listener queue to be empty. This is primarily useful for deterministic
* testing. This blocks until either the topic listener queue is empty (e.g. there are no
* more events that need to be passed along to callbacks or poll queues) or the timeout expires.
*
* @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to
* block indefinitely
* @return False if timed out, otherwise true.
*/
public boolean waitForTopicListenerQueue(double timeout) {
return m_topicListener.waitForQueue(timeout);
}
private static final class ValueListener extends ListenerBase<ValueNotification> {
ValueListener(NetworkTableInstance inst) {
super(inst);
}
int add(int handle, int eventMask, Consumer<ValueNotification> listener) {
m_lock.lock();
try {
if (m_poller == 0) {
m_poller = NetworkTablesJNI.createValueListenerPoller(m_inst.getHandle());
startThread("NTValueListener");
}
int h = NetworkTablesJNI.addPolledValueListener(m_poller, handle, eventMask);
m_listeners.put(h, listener);
return h;
} finally {
m_lock.unlock();
}
}
void remove(int listener) {
m_lock.lock();
try {
m_listeners.remove(listener);
} finally {
m_lock.unlock();
}
NetworkTablesJNI.removeValueListener(listener);
}
@Override
protected ValueNotification[] readQueue() {
return NetworkTablesJNI.readValueListenerQueue(m_inst, m_poller);
}
@Override
protected void destroyPoller() {
NetworkTablesJNI.destroyValueListenerPoller(m_poller);
}
}
private ValueListener m_valueListener = new ValueListener(this);
/**
* Add a value listener for value changes on a subscriber. The callback function is called
* asynchronously on a separate thread, so it's important to use synchronization or atomics when
* accessing any shared state from the callback function. This does NOT keep the subscriber
* active.
*
* @param subscriber Subscriber
* @param eventMask Bitmask of ValueListenerFlags values
* @param listener Listener function
* @return Listener handle
*/
public int addValueListener(
Subscriber subscriber, int eventMask, Consumer<ValueNotification> listener) {
if (subscriber.getTopic().getInstance().getHandle() != m_handle) {
throw new IllegalArgumentException("subscriber is not from this instance");
}
return m_valueListener.add(subscriber.getHandle(), eventMask, listener);
}
/**
* Add a value listener for value changes on a subscriber. The callback function is called
* asynchronously on a separate thread, so it's important to use synchronization or atomics when
* accessing any shared state from the callback function. This does NOT keep the subscriber
* active.
*
* @param subscriber Subscriber
* @param eventMask Bitmask of ValueListenerFlags values
* @param listener Listener function
* @return Listener handle
*/
public int addValueListener(
MultiSubscriber subscriber, int eventMask, Consumer<ValueNotification> listener) {
if (subscriber.getInstance().getHandle() != m_handle) {
throw new IllegalArgumentException("subscriber is not from this instance");
}
return m_valueListener.add(subscriber.getHandle(), eventMask, listener);
}
/**
* Add a value listener for value changes on an entry. The callback function is called
* asynchronously on a separate thread, so it's important to use synchronization or atomics when
* accessing any shared state from the callback function.
*
* @param entry Entry
* @param eventMask Bitmask of ValueListenerFlags values
* @param listener Listener function
* @return Listener handle
*/
public int addValueListener(
NetworkTableEntry entry, int eventMask, Consumer<ValueNotification> listener) {
if (entry.getTopic().getInstance().getHandle() != m_handle) {
throw new IllegalArgumentException("entry is not from this instance");
}
return m_valueListener.add(entry.getHandle(), eventMask, listener);
}
/**
* Remove a value listener.
*
* @param listener Listener handle to remove
*/
public void removeValueListener(int listener) {
m_valueListener.remove(listener);
}
/**
* Wait for the value listener queue to be empty. This is primarily useful for deterministic
* testing. This blocks until either the value listener queue is empty (e.g. there are no
* more events that need to be passed along to callbacks or poll queues) or the timeout expires.
*
* @param timeout timeout, in seconds. Set to 0 for non-blocking behavior, or a negative value to
* block indefinitely
* @return False if timed out, otherwise true.
*/
public boolean waitForValueListenerQueue(double timeout) {
return m_valueListener.waitForQueue(timeout);
Consumer<NetworkTableEvent> listener) {
return m_listeners.add(prefixes, eventMask, listener);
}
/*
@@ -871,8 +687,8 @@ public final class NetworkTableInstance implements AutoCloseable {
}
/**
* Stops local-only operation. startServer or startClient can be called after this call to start a
* server or client.
* Stops local-only operation. startServer or startClient can be called after this call to start
* a server or client.
*/
public void stopLocal() {
NetworkTablesJNI.stopLocal(m_handle);
@@ -1003,8 +819,8 @@ public final class NetworkTableInstance implements AutoCloseable {
}
/**
* Sets server addresses and ports for client (without restarting client). The client will attempt
* to connect to each server in round robin fashion.
* Sets server addresses and ports for client (without restarting client). The client will
* attempt to connect to each server in round robin fashion.
*
* @param serverNames array of server names
* @param ports array of port numbers (0=default)
@@ -1067,8 +883,8 @@ public final class NetworkTableInstance implements AutoCloseable {
/**
* Flushes all updated values immediately to the network. Note: This is rate-limited to protect
* the network from flooding. This is primarily useful for synchronizing network updates with user
* code.
* the network from flooding. This is primarily useful for synchronizing network updates with
* user code.
*/
public void flush() {
NetworkTablesJNI.flush(m_handle);
@@ -1137,98 +953,19 @@ public final class NetworkTableInstance implements AutoCloseable {
NetworkTablesJNI.stopConnectionDataLog(logger);
}
private final ReentrantLock m_loggerLock = new ReentrantLock();
private final Map<Integer, Consumer<LogMessage>> m_loggers = new HashMap<>();
private int m_loggerPoller;
@SuppressWarnings("PMD.AvoidCatchingThrowable")
private void startLogThread() {
var loggerThread =
new Thread(
() -> {
boolean wasInterrupted = false;
while (!Thread.interrupted()) {
try {
WPIUtilJNI.waitForObject(m_loggerPoller);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
// don't try to destroy poller, as its handle is likely no longer valid
wasInterrupted = true;
break;
}
LogMessage[] events = NetworkTablesJNI.readLoggerQueue(this, m_loggerPoller);
for (LogMessage event : events) {
Consumer<LogMessage> logger;
m_loggerLock.lock();
try {
logger = m_loggers.get(event.logger);
} finally {
m_loggerLock.unlock();
}
if (logger != null) {
try {
logger.accept(event);
} catch (Throwable throwable) {
System.err.println(
"Unhandled exception during logger callback: " + throwable.toString());
throwable.printStackTrace();
}
}
}
}
m_loggerLock.lock();
try {
if (!wasInterrupted) {
NetworkTablesJNI.destroyLoggerPoller(m_loggerPoller);
}
} finally {
m_loggerLock.unlock();
}
},
"NTLogger");
loggerThread.setDaemon(true);
loggerThread.start();
}
/**
* Add logger callback function. By default, log messages are sent to stderr; this function sends
* log messages with the specified levels to the provided callback function instead. The callback
* function will only be called for log messages with level greater than or equal to minLevel and
* less than or equal to maxLevel; messages outside this range will be silently ignored.
*
* @param func log callback function
* @param minLevel minimum log level
* @param maxLevel maximum log level
* @return Logger handle
* @param func callback function
* @return Listener handle
*/
public int addLogger(Consumer<LogMessage> func, int minLevel, int maxLevel) {
m_loggerLock.lock();
try {
if (m_loggerPoller == 0) {
m_loggerPoller = NetworkTablesJNI.createLoggerPoller(m_handle);
startLogThread();
}
int handle = NetworkTablesJNI.addPolledLogger(m_loggerPoller, minLevel, maxLevel);
m_loggers.put(handle, func);
return handle;
} finally {
m_loggerLock.unlock();
}
}
/**
* Remove a logger.
*
* @param logger Logger handle to remove
*/
public void removeLogger(int logger) {
m_loggerLock.lock();
try {
m_loggers.remove(logger);
} finally {
m_loggerLock.unlock();
}
NetworkTablesJNI.removeLogger(logger);
public int addLogger(int minLevel, int maxLevel, Consumer<NetworkTableEvent> func) {
return m_listeners.addLogger(minLevel, maxLevel, func);
}
@Override

View File

@@ -199,40 +199,18 @@ public final class NetworkTablesJNI {
public static native TopicInfo getTopicInfo(NetworkTableInstance inst, int topic);
public static native int createTopicListenerPoller(int inst);
public static native int createListenerPoller(int inst);
public static native void destroyTopicListenerPoller(int poller);
public static native void destroyListenerPoller(int poller);
public static native int addPolledTopicListener(int poller, String[] prefixes, int flags);
public static native int addListener(int poller, String[] prefixes, int mask);
public static native int addPolledTopicListener(int poller, int handle, int flags);
public static native int addListener(int poller, int handle, int mask);
public static native TopicNotification[] readTopicListenerQueue(
public static native NetworkTableEvent[] readListenerQueue(
NetworkTableInstance inst, int poller);
public static native void removeTopicListener(int topicListener);
public static native int createValueListenerPoller(int inst);
public static native void destroyValueListenerPoller(int poller);
public static native int addPolledValueListener(int poller, int subentry, int flags);
public static native ValueNotification[] readValueListenerQueue(
NetworkTableInstance inst, int poller);
public static native void removeValueListener(int valueListener);
public static native int createConnectionListenerPoller(int inst);
public static native void destroyConnectionListenerPoller(int poller);
public static native int addPolledConnectionListener(int poller, boolean immediateNotify);
public static native ConnectionNotification[] readConnectionListenerQueue(
NetworkTableInstance inst, int poller);
public static native void removeConnectionListener(int connListener);
public static native void removeListener(int listener);
public static native int getNetworkMode(int inst);
@@ -287,13 +265,5 @@ public final class NetworkTablesJNI {
public static native void stopConnectionDataLog(int logger);
public static native int createLoggerPoller(int inst);
public static native void destroyLoggerPoller(int poller);
public static native int addPolledLogger(int poller, int minLevel, int maxLevel);
public static native LogMessage[] readLoggerQueue(NetworkTableInstance inst, int poller);
public static native void removeLogger(int logger);
public static native int addLogger(int poller, int minLevel, int maxLevel);
}