############################################################################### ## Copyright (C) Photon Vision. ############################################################################### ## This program is free software: you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation, either version 3 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program. If not, see . ############################################################################### import struct from typing import Generic, Optional, Protocol, TypeVar import wpilib from wpimath import Quaternion, Rotation3d, Transform3d, Translation3d T = TypeVar("T") class Serde(Generic[T], Protocol): def pack(self, value: T) -> "Packet": ... def unpack(self, packet: "Packet") -> T: ... class Packet: def __init__(self, data: bytes = b""): """ * Constructs an empty packet. * * @param self.size The self.size of the packet buffer. """ self.packetData = data self.size = len(data) self.readPos = 0 self.outOfBytes = False def clear(self) -> None: """Clears the packet and resets the read and write positions.""" self.packetData = bytes(self.size) self.readPos = 0 self.outOfBytes = False def getSize(self): return self.size _NO_MORE_BYTES_MESSAGE = """ Photonlib - Ran out of bytes while decoding. Make sure the version of photonvision on the coprocessor matches the version of photonlib running in the robot code. """ def _getNextByteAsInt(self) -> int: if not self.outOfBytes: try: retVal = 0x00FF & self.packetData[self.readPos] self.readPos += 1 return retVal except IndexError: wpilib.reportError(Packet._NO_MORE_BYTES_MESSAGE, True) self.outOfBytes = True return 0x00 def getData(self) -> bytes: """ * Returns the packet data. * * @return The packet data. """ return self.packetData def setData(self, data: bytes): """ * Sets the packet data. * * @param data The packet data. """ self.clear() self.packetData = data self.size = len(self.packetData) def _decodeGeneric(self, unpackFormat, numBytes): # Read ints in from the data buffer intList = [] for _ in range(numBytes): intList.append(self._getNextByteAsInt()) # Interpret the bytes as the requested type. # Note due to NT's byte order assumptions, # we have to flip the order of intList value = struct.unpack(unpackFormat, bytes(intList))[0] return value def decode8(self) -> int: """ * Returns a single decoded byte from the packet. * * @return A decoded byte from the packet. """ return self._decodeGeneric(" int: """ * Returns a single decoded short from the packet. * * @return A decoded short from the packet. """ return self._decodeGeneric(" int: """ * Returns a decoded int (32 bytes) from the packet. * * @return A decoded int from the packet. """ return self._decodeGeneric(" float: """ * Returns a decoded float from the packet. * * @return A decoded float from the packet. """ return self._decodeGeneric(" int: """ * Returns a decoded int64 from the packet. * * @return A decoded int64 from the packet. """ return self._decodeGeneric(" float: """ * Returns a decoded double from the packet. * * @return A decoded double from the packet. """ return self._decodeGeneric(" bool: """ * Returns a decoded boolean from the packet. * * @return A decoded boolean from the packet. """ return self.decode8() == 1 def decodeDoubleArray(self, length: int) -> list[float]: """ * Returns a decoded array of floats from the packet. """ ret = [] for _ in range(length): ret.append(self.decodeDouble()) return ret def decodeShortList(self) -> list[int]: """ * Returns a decoded array of shorts from the packet. """ length = self.decode8() ret = [] for _ in range(length): ret.append(self.decode16()) return ret def decodeTransform(self) -> Transform3d: """ * Returns a decoded Transform3d * * @return A decoded Tansform3d from the packet. """ x = self.decodeDouble() y = self.decodeDouble() z = self.decodeDouble() translation = Translation3d(x, y, z) w = self.decodeDouble() x = self.decodeDouble() y = self.decodeDouble() z = self.decodeDouble() rotation = Rotation3d(Quaternion(w, x, y, z)) return Transform3d(translation, rotation) def decodeList(self, serde: Serde[T]) -> list[T]: retList = [] arr_len = self.decode8() for _ in range(arr_len): retList.append(serde.unpack(self)) return retList def decodeOptional(self, serde: Serde[T]) -> Optional[T]: if self.decodeBoolean(): return serde.unpack(self) else: return None def _encodeGeneric(self, packFormat, value): """ Append bytes to the packet data buffer. """ self.packetData = self.packetData + struct.pack(packFormat, value) self.size = len(self.packetData) def encode8(self, value: int): """ Encodes a single byte and appends it to the packet. """ self._encodeGeneric("