mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-27 02:01:42 +00:00
[wpiutil, ntcore] Add structured data support (#5391)
This adds support for two serialization formats for complex data types: - Protobuf for complex objects with variable length internals that need forward and backward wire compatibility (lower speed, more flexible) - Raw struct (ByteBuffer-style) for fixed-length objects (higher speed, less flexible) Deserialization can be done either by creating a new object (for immutable objects) or overwriting the contents of an existing object (for mutable objects). Implementing classes should provide inner classes that implement the Protobuf or Struct interface (in Java) or specialize the wpi::Protobuf or wpi::Struct struct (in C++). It is possible for classes to implement both. If the class itself does not implement serialization, it's possible for third parties/users to provide an implementation instead. Uses the Google protobuf implementation for C++ and the QuickBuffers alternative protobuf implementation for Java.
This commit is contained in:
@@ -7,16 +7,22 @@ package edu.wpi.first.networktables;
|
||||
import edu.wpi.first.util.WPIUtilJNI;
|
||||
import edu.wpi.first.util.concurrent.Event;
|
||||
import edu.wpi.first.util.datalog.DataLog;
|
||||
import edu.wpi.first.util.protobuf.Protobuf;
|
||||
import edu.wpi.first.util.struct.Struct;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
import us.hebi.quickbuf.ProtoMessage;
|
||||
|
||||
/**
|
||||
* NetworkTables Instance.
|
||||
@@ -86,6 +92,7 @@ public final class NetworkTableInstance implements AutoCloseable {
|
||||
public synchronized void close() {
|
||||
if (m_owned && m_handle != 0) {
|
||||
m_listeners.close();
|
||||
m_schemas.forEach((k, v) -> v.close());
|
||||
NetworkTablesJNI.destroyInstance(m_handle);
|
||||
m_handle = 0;
|
||||
}
|
||||
@@ -176,15 +183,119 @@ public final class NetworkTableInstance implements AutoCloseable {
|
||||
handle = topic.getHandle();
|
||||
}
|
||||
|
||||
topic = new {{ t.TypeName }}Topic(this, handle);
|
||||
m_topics.put(name, topic);
|
||||
{{ t.TypeName }}Topic wrapTopic = new {{ t.TypeName }}Topic(this, handle);
|
||||
m_topics.put(name, wrapTopic);
|
||||
|
||||
// also cache by handle
|
||||
m_topicsByHandle.put(handle, topic);
|
||||
m_topicsByHandle.put(handle, wrapTopic);
|
||||
|
||||
return ({{ t.TypeName }}Topic) topic;
|
||||
return wrapTopic;
|
||||
}
|
||||
{% endfor %}
|
||||
|
||||
/**
|
||||
* Get protobuf-encoded value topic.
|
||||
*
|
||||
* @param <T> value class (inferred from proto)
|
||||
* @param <MessageType> protobuf message type (inferred from proto)
|
||||
* @param name topic name
|
||||
* @param proto protobuf serialization implementation
|
||||
* @return ProtobufTopic
|
||||
*/
|
||||
public <T, MessageType extends ProtoMessage<?>>
|
||||
ProtobufTopic<T> getProtobufTopic(String name, Protobuf<T, MessageType> proto) {
|
||||
Topic topic = m_topics.get(name);
|
||||
if (topic instanceof ProtobufTopic<?>
|
||||
&& ((ProtobufTopic<?>) topic).getProto().equals(proto)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
ProtobufTopic<T> wrapTopic = (ProtobufTopic<T>) topic;
|
||||
return wrapTopic;
|
||||
}
|
||||
|
||||
int handle;
|
||||
if (topic == null) {
|
||||
handle = NetworkTablesJNI.getTopic(m_handle, name);
|
||||
} else {
|
||||
handle = topic.getHandle();
|
||||
}
|
||||
|
||||
ProtobufTopic<T> wrapTopic = ProtobufTopic.wrap(this, handle, proto);
|
||||
m_topics.put(name, wrapTopic);
|
||||
|
||||
// also cache by handle
|
||||
m_topicsByHandle.put(handle, wrapTopic);
|
||||
|
||||
return wrapTopic;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get struct-encoded value topic.
|
||||
*
|
||||
* @param <T> value class (inferred from struct)
|
||||
* @param name topic name
|
||||
* @param struct struct serialization implementation
|
||||
* @return StructTopic
|
||||
*/
|
||||
public <T>
|
||||
StructTopic<T> getStructTopic(String name, Struct<T> struct) {
|
||||
Topic topic = m_topics.get(name);
|
||||
if (topic instanceof StructTopic<?>
|
||||
&& ((StructTopic<?>) topic).getStruct().equals(struct)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
StructTopic<T> wrapTopic = (StructTopic<T>) topic;
|
||||
return wrapTopic;
|
||||
}
|
||||
|
||||
int handle;
|
||||
if (topic == null) {
|
||||
handle = NetworkTablesJNI.getTopic(m_handle, name);
|
||||
} else {
|
||||
handle = topic.getHandle();
|
||||
}
|
||||
|
||||
StructTopic<T> wrapTopic = StructTopic.wrap(this, handle, struct);
|
||||
m_topics.put(name, wrapTopic);
|
||||
|
||||
// also cache by handle
|
||||
m_topicsByHandle.put(handle, wrapTopic);
|
||||
|
||||
return wrapTopic;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get struct-encoded value array topic.
|
||||
*
|
||||
* @param <T> value class (inferred from struct)
|
||||
* @param name topic name
|
||||
* @param struct struct serialization implementation
|
||||
* @return StructArrayTopic
|
||||
*/
|
||||
public <T>
|
||||
StructArrayTopic<T> getStructArrayTopic(String name, Struct<T> struct) {
|
||||
Topic topic = m_topics.get(name);
|
||||
if (topic instanceof StructArrayTopic<?>
|
||||
&& ((StructArrayTopic<?>) topic).getStruct().equals(struct)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
StructArrayTopic<T> wrapTopic = (StructArrayTopic<T>) topic;
|
||||
return wrapTopic;
|
||||
}
|
||||
|
||||
int handle;
|
||||
if (topic == null) {
|
||||
handle = NetworkTablesJNI.getTopic(m_handle, name);
|
||||
} else {
|
||||
handle = topic.getHandle();
|
||||
}
|
||||
|
||||
StructArrayTopic<T> wrapTopic = StructArrayTopic.wrap(this, handle, struct);
|
||||
m_topics.put(name, wrapTopic);
|
||||
|
||||
// also cache by handle
|
||||
m_topicsByHandle.put(handle, wrapTopic);
|
||||
|
||||
return wrapTopic;
|
||||
}
|
||||
|
||||
private Topic[] topicHandlesToTopics(int[] handles) {
|
||||
Topic[] topics = new Topic[handles.length];
|
||||
for (int i = 0; i < handles.length; i++) {
|
||||
@@ -1050,6 +1161,78 @@ public final class NetworkTableInstance implements AutoCloseable {
|
||||
return m_listeners.addLogger(minLevel, maxLevel, func);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether there is a data schema already registered with the given name that this
|
||||
* instance has published. This does NOT perform a check as to whether the schema has already
|
||||
* been published by another node on the network.
|
||||
*
|
||||
* @param name Name (the string passed as the data type for topics using this schema)
|
||||
* @return True if schema already registered
|
||||
*/
|
||||
public boolean hasSchema(String name) {
|
||||
return m_schemas.containsKey("/.schema/" + name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a data schema. Data schemas provide information for how a certain data type string
|
||||
* can be decoded. The type string of a data schema indicates the type of the schema itself (e.g.
|
||||
* "protobuf" for protobuf schemas, "struct" for struct schemas, etc). In NetworkTables, schemas
|
||||
* are published just like normal topics, with the name being generated from the provided name:
|
||||
* "/.schema/name". Duplicate calls to this function with the same name are silently ignored.
|
||||
*
|
||||
* @param name Name (the string passed as the data type for topics using this schema)
|
||||
* @param type Type of schema (e.g. "protobuf", "struct", etc)
|
||||
* @param schema Schema data
|
||||
*/
|
||||
public void addSchema(String name, String type, byte[] schema) {
|
||||
m_schemas.computeIfAbsent("/.schema/" + name, k -> {
|
||||
RawPublisher pub = getRawTopic(k).publishEx(type, "{\"retained\":true}");
|
||||
pub.setDefault(schema);
|
||||
return pub;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a data schema. Data schemas provide information for how a certain data type string
|
||||
* can be decoded. The type string of a data schema indicates the type of the schema itself (e.g.
|
||||
* "protobuf" for protobuf schemas, "struct" for struct schemas, etc). In NetworkTables, schemas
|
||||
* are published just like normal topics, with the name being generated from the provided name:
|
||||
* "/.schema/name". Duplicate calls to this function with the same name are silently ignored.
|
||||
*
|
||||
* @param name Name (the string passed as the data type for topics using this schema)
|
||||
* @param type Type of schema (e.g. "protobuf", "struct", etc)
|
||||
* @param schema Schema data
|
||||
*/
|
||||
public void addSchema(String name, String type, String schema) {
|
||||
m_schemas.computeIfAbsent("/.schema/" + name, k -> {
|
||||
RawPublisher pub = getRawTopic(k).publishEx(type, "{\"retained\":true}");
|
||||
pub.setDefault(StandardCharsets.UTF_8.encode(schema));
|
||||
return pub;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a protobuf schema. Duplicate calls to this function with the same name are silently
|
||||
* ignored.
|
||||
*
|
||||
* @param proto protobuf serialization object
|
||||
*/
|
||||
public void addSchema(Protobuf<?, ?> proto) {
|
||||
proto.forEachDescriptor(
|
||||
this::hasSchema,
|
||||
(typeString, schema) -> addSchema(typeString, "proto:FileDescriptorProto", schema));
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a struct schema. Duplicate calls to this function with the same name are silently
|
||||
* ignored.
|
||||
*
|
||||
* @param struct struct serialization object
|
||||
*/
|
||||
public void addSchema(Struct<?> struct) {
|
||||
addSchemaImpl(struct, new HashSet<>());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == this) {
|
||||
@@ -1067,6 +1250,22 @@ public final class NetworkTableInstance implements AutoCloseable {
|
||||
return m_handle;
|
||||
}
|
||||
|
||||
private void addSchemaImpl(Struct<?> struct, Set<String> seen) {
|
||||
String typeString = struct.getTypeString();
|
||||
if (hasSchema(typeString)) {
|
||||
return;
|
||||
}
|
||||
if (!seen.add(typeString)) {
|
||||
throw new UnsupportedOperationException(typeString + ": circular reference with " + seen);
|
||||
}
|
||||
addSchema(typeString, "structschema", struct.getSchema());
|
||||
for (Struct<?> inner : struct.getNested()) {
|
||||
addSchemaImpl(inner, seen);
|
||||
}
|
||||
seen.remove(typeString);
|
||||
}
|
||||
|
||||
private boolean m_owned;
|
||||
private int m_handle;
|
||||
private final ConcurrentMap<String, RawPublisher> m_schemas = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user