mirror of
https://github.com/PhotonVision/photonvision
synced 2026-06-28 02:11:40 +00:00
generate packing for python messages (#1535)
Generate packet serialization in Python, too.
This commit is contained in:
@@ -22,7 +22,7 @@ import wpilib
|
||||
|
||||
|
||||
class Packet:
|
||||
def __init__(self, data: bytes):
|
||||
def __init__(self, data: bytes = b""):
|
||||
"""
|
||||
* Constructs an empty packet.
|
||||
*
|
||||
@@ -198,3 +198,110 @@ class Packet:
|
||||
return serde.unpack(self)
|
||||
else:
|
||||
return None
|
||||
|
||||
def _encodeGeneric(self, packFormat, value):
|
||||
"""
|
||||
Append bytes to the packet data buffer.
|
||||
"""
|
||||
self.packetData = self.packetData + struct.pack(packFormat, value)
|
||||
self.size = len(self.packetData)
|
||||
|
||||
def encode8(self, value: int):
|
||||
"""
|
||||
Encodes a single byte and appends it to the packet.
|
||||
"""
|
||||
self._encodeGeneric("<b", value)
|
||||
|
||||
def encode16(self, value: int):
|
||||
"""
|
||||
Encodes a short (2 bytes) and appends it to the packet.
|
||||
"""
|
||||
self._encodeGeneric("<h", value)
|
||||
|
||||
def encodeInt(self, value: int):
|
||||
"""
|
||||
Encodes an int (4 bytes) and appends it to the packet.
|
||||
"""
|
||||
self._encodeGeneric("<l", value)
|
||||
|
||||
def encodeFloat(self, value: float):
|
||||
"""
|
||||
Encodes a float (4 bytes) and appends it to the packet.
|
||||
"""
|
||||
self._encodeGeneric("<f", value)
|
||||
|
||||
def encodeLong(self, value: int):
|
||||
"""
|
||||
Encodes a long (8 bytes) and appends it to the packet.
|
||||
"""
|
||||
self._encodeGeneric("<q", value)
|
||||
|
||||
def encodeDouble(self, value: float):
|
||||
"""
|
||||
Encodes a double (8 bytes) and appends it to the packet.
|
||||
"""
|
||||
self._encodeGeneric("<d", value)
|
||||
|
||||
def encodeBoolean(self, value: bool):
|
||||
"""
|
||||
Encodes a boolean as a single byte and appends it to the packet.
|
||||
"""
|
||||
self.encode8(1 if value else 0)
|
||||
|
||||
def encodeDoubleArray(self, values: list[float]):
|
||||
"""
|
||||
Encodes an array of doubles and appends it to the packet.
|
||||
"""
|
||||
self.encode8(len(values))
|
||||
for value in values:
|
||||
self.encodeDouble(value)
|
||||
|
||||
def encodeShortList(self, values: list[int]):
|
||||
"""
|
||||
Encodes a list of shorts, with length prefixed as a single byte.
|
||||
"""
|
||||
self.encode8(len(values))
|
||||
for value in values:
|
||||
self.encode16(value)
|
||||
|
||||
def encodeTransform(self, transform: Transform3d):
|
||||
"""
|
||||
Encodes a Transform3d (translation and rotation) and appends it to the packet.
|
||||
"""
|
||||
# Encode Translation3d part (x, y, z)
|
||||
self.encodeDouble(transform.translation().x)
|
||||
self.encodeDouble(transform.translation().y)
|
||||
self.encodeDouble(transform.translation().z)
|
||||
|
||||
# Encode Rotation3d as Quaternion (w, x, y, z)
|
||||
quaternion = transform.rotation().getQuaternion()
|
||||
self.encodeDouble(quaternion.W())
|
||||
self.encodeDouble(quaternion.X())
|
||||
self.encodeDouble(quaternion.Y())
|
||||
self.encodeDouble(quaternion.Z())
|
||||
|
||||
def encodeList(self, values: list[Any], serde: Type):
|
||||
"""
|
||||
Encodes a list of items using a specific serializer and appends it to the packet.
|
||||
"""
|
||||
self.encode8(len(values))
|
||||
for item in values:
|
||||
packed = serde.pack(item)
|
||||
self.packetData = self.packetData + packed.getData()
|
||||
self.size = len(self.packetData)
|
||||
|
||||
def encodeOptional(self, value: Optional[Any], serde: Type):
|
||||
"""
|
||||
Encodes an optional value using a specific serializer.
|
||||
"""
|
||||
if value is None:
|
||||
self.encodeBoolean(False)
|
||||
else:
|
||||
self.encodeBoolean(True)
|
||||
packed = serde.pack(value)
|
||||
self.packetData = self.packetData + packed.getData()
|
||||
self.size = len(self.packetData)
|
||||
|
||||
def encodeBytes(self, value: bytes):
|
||||
self.packetData = self.packetData + value
|
||||
self.size = len(self.packetData)
|
||||
|
||||
Reference in New Issue
Block a user