Add photonlibpy (#1040)

* Added a pure-python implementation of photonlib, named photonlibpy and hosted on pypi

---------

Co-authored-by: Matt <matthew.morley.ca@gmail.com>
This commit is contained in:
Chris Gerth
2023-12-16 12:32:49 -06:00
committed by GitHub
parent 2e39549771
commit 47aea29b6b
12 changed files with 899 additions and 0 deletions

View File

@@ -0,0 +1 @@
# No one here but us chickens

View File

@@ -0,0 +1,45 @@
from dataclasses import dataclass, field
from wpimath.geometry import Transform3d
from photonlibpy.packet import Packet
@dataclass
class PNPResult:
_NUM_BYTES_IN_FLOAT = 8
PACK_SIZE_BYTES = 1 + (_NUM_BYTES_IN_FLOAT * 7 * 2) + (_NUM_BYTES_IN_FLOAT * 3)
isPresent: bool = False
best: Transform3d = field(default_factory=Transform3d)
alt: Transform3d = field(default_factory=Transform3d)
ambiguity: float = 0.0
bestReprojError: float = 0.0
altReprojError: float = 0.0
def createFromPacket(self, packet: Packet) -> Packet:
self.isPresent = packet.decodeBoolean()
self.best = packet.decodeTransform()
self.alt = packet.decodeTransform()
self.bestReprojError = packet.decodeDouble()
self.altReprojError = packet.decodeDouble()
self.ambiguity = packet.decodeDouble()
return packet
@dataclass
class MultiTargetPNPResult:
_MAX_IDS = 32
# pnpresult + MAX_IDS possible targets (arbitrary upper limit that should never be hit, ideally)
_PACK_SIZE_BYTES = PNPResult.PACK_SIZE_BYTES + (1 * _MAX_IDS)
estimatedPose: PNPResult = field(default_factory=PNPResult)
fiducialIDsUsed: list[int] = field(default_factory=list)
def createFromPacket(self, packet: Packet) -> Packet:
self.estimatedPose = PNPResult()
self.estimatedPose.createFromPacket(packet)
self.fiducialIDsUsed = []
for _ in range(MultiTargetPNPResult._MAX_IDS):
fidId = packet.decode16()
if fidId >= 0:
self.fiducialIDsUsed.append(fidId)
return packet

View File

@@ -0,0 +1,143 @@
import struct
from wpimath.geometry import Transform3d, Translation3d, Rotation3d, Quaternion
import wpilib
class Packet:
def __init__(self, data: list[int]):
"""
* Constructs an empty packet.
*
* @param self.size The self.size of the packet buffer.
"""
self.packetData = data
self.size = len(data)
self.readPos = 0
self.outOfBytes = False
def clear(self):
"""Clears the packet and resets the read and write positions."""
self.packetData = [0] * self.size
self.readPos = 0
self.outOfBytes = False
def getSize(self):
return self.size
_NO_MORE_BYTES_MESSAGE = """
Photonlib - Ran out of bytes while decoding.
Make sure the version of photonvision on the coprocessor
matches the version of photonlib running in the robot code.
"""
def _getNextByte(self) -> int:
retVal = 0x00
if not self.outOfBytes:
try:
retVal = 0x00FF & self.packetData[self.readPos]
self.readPos += 1
except IndexError:
wpilib.reportError(Packet._NO_MORE_BYTES_MESSAGE, True)
self.outOfBytes = True
return retVal
def getData(self) -> list[int]:
"""
* Returns the packet data.
*
* @return The packet data.
"""
return self.packetData
def setData(self, data: list[int]):
"""
* Sets the packet data.
*
* @param data The packet data.
"""
self.clear()
self.packetData = data
self.size = len(self.packetData)
def _decodeGeneric(self, unpackFormat, numBytes):
# Read ints in from the data buffer
intList = []
for _ in range(numBytes):
intList.append(self._getNextByte())
# Interpret the bytes as a floating point number
value = struct.unpack(unpackFormat, bytes(intList))[0]
return value
def decode8(self) -> int:
"""
* Returns a single decoded byte from the packet.
*
* @return A decoded byte from the packet.
"""
return self._decodeGeneric(">b", 1)
def decode16(self) -> int:
"""
* Returns a single decoded byte from the packet.
*
* @return A decoded byte from the packet.
"""
return self._decodeGeneric(">h", 2)
def decode32(self) -> int:
"""
* Returns a decoded int (32 bytes) from the packet.
*
* @return A decoded int from the packet.
"""
return self._decodeGeneric(">l", 4)
def decodeDouble(self) -> float:
"""
* Returns a decoded double from the packet.
*
* @return A decoded double from the packet.
"""
return self._decodeGeneric(">d", 8)
def decodeBoolean(self) -> bool:
"""
* Returns a decoded boolean from the packet.
*
* @return A decoded boolean from the packet.
"""
return self.decode8() == 1
def decodeDoubleArray(self, length: int) -> list[float]:
"""
* Returns a decoded array of floats from the packet.
*
* @return A decoded array of floats from the packet.
"""
ret = []
for _ in range(length):
ret.append(self.decodeDouble())
return ret
def decodeTransform(self) -> Transform3d:
"""
* Returns a decoded Transform3d
*
* @return A decoded Tansform3d from the packet.
"""
x = self.decodeDouble()
y = self.decodeDouble()
z = self.decodeDouble()
translation = Translation3d(x, y, z)
w = self.decodeDouble()
x = self.decodeDouble()
y = self.decodeDouble()
z = self.decodeDouble()
rotation = Rotation3d(Quaternion(w, x, y, z))
return Transform3d(translation, rotation)

View File

@@ -0,0 +1,170 @@
from enum import Enum
import ntcore
from wpilib import Timer
import wpilib
from photonlibpy.packet import Packet
from photonlibpy.photonPipelineResult import PhotonPipelineResult
from photonlibpy.version import PHOTONVISION_VERSION, PHOTONLIB_VERSION
class VisionLEDMode(Enum):
kDefault = -1
kOff = 0
kOn = 1
kBlink = 2
lastVersionTimeCheck = 0.0
_VERSION_CHECK_ENABLED = True
def setVersionCheckEnabled(enabled: bool):
_VERSION_CHECK_ENABLED = enabled
class PhotonCamera:
def __init__(self, cameraName: str):
instance = ntcore.NetworkTableInstance.getDefault()
self.name = cameraName
self._tableName = "photonvision"
photonvision_root_table = instance.getTable(self._tableName)
self.cameraTable = photonvision_root_table.getSubTable(cameraName)
self.path = self.cameraTable.getPath()
self.rawBytesEntry = self.cameraTable.getRawTopic("rawBytes").subscribe(
"rawBytes", bytes([]), ntcore.PubSubOptions(periodic=0.01, sendAll=True)
)
self.driverModePublisher = self.cameraTable.getBooleanTopic(
"driverModeRequest"
).publish()
self.driverModeSubscriber = self.cameraTable.getBooleanTopic(
"driverMode"
).subscribe(False)
self.inputSaveImgEntry = self.cameraTable.getIntegerTopic(
"inputSaveImgCmd"
).getEntry(0)
self.outputSaveImgEntry = self.cameraTable.getIntegerTopic(
"outputSaveImgCmd"
).getEntry(0)
self.pipelineIndexRequest = self.cameraTable.getIntegerTopic(
"pipelineIndexRequest"
).publish()
self.pipelineIndexState = self.cameraTable.getIntegerTopic(
"pipelineIndexState"
).subscribe(0)
self.heartbeatEntry = self.cameraTable.getIntegerTopic("heartbeat").subscribe(
-1
)
self.ledModeRequest = photonvision_root_table.getIntegerTopic(
"ledModeRequest"
).publish()
self.ledModeState = photonvision_root_table.getIntegerTopic(
"ledModeState"
).subscribe(-1)
self.versionEntry = photonvision_root_table.getStringTopic("version").subscribe(
""
)
# Existing is enough to make this multisubscriber do its thing
self.topicNameSubscriber = ntcore.MultiSubscriber(
instance, ["/photonvision/"], ntcore.PubSubOptions(topicsOnly=True)
)
self.prevHeartbeat = 0
self.prevHeartbeatChangeTime = Timer.getFPGATimestamp()
def getLatestResult(self) -> PhotonPipelineResult:
self._versionCheck()
retVal = PhotonPipelineResult()
packetWithTimestamp = self.rawBytesEntry.getAtomic()
byteList = packetWithTimestamp.value
timestamp = packetWithTimestamp.time
if len(byteList) < 1:
return retVal
else:
retVal.populateFromPacket(Packet(byteList))
# NT4 allows us to correct the timestamp based on when the message was sent
retVal.setTimestampSeconds(
timestamp / 1e-6 - retVal.getLatencyMillis() / 1e-3
)
return retVal
def getDriverMode(self) -> bool:
return self.driverModeSubscriber.get()
def setDriverMode(self, driverMode: bool) -> None:
self.driverModePublisher.set(driverMode)
def takeInputSnapshot(self) -> None:
self.inputSaveImgEntry.set(self.inputSaveImgEntry.get() + 1)
def takeOutputSnapshot(self) -> None:
self.outputSaveImgEntry.set(self.outputSaveImgEntry.get() + 1)
def getPipelineIndex(self) -> int:
return self.pipelineIndexState.get(0)
def setPipelineIndex(self, index: int) -> None:
self.pipelineIndexRequest.set(index)
def getLEDMode(self) -> VisionLEDMode:
mode = self.ledModeState.get()
return VisionLEDMode(mode)
def setLEDMode(self, led: VisionLEDMode) -> None:
self.ledModeRequest.set(led.value)
def getName(self) -> str:
return self.name
def isConnected(self) -> bool:
curHeartbeat = self.heartbeatEntry.get()
now = Timer.getFPGATimestamp()
if curHeartbeat != self.prevHeartbeat:
self.prevHeartbeat = curHeartbeat
self.prevHeartbeatChangeTime = now
return (now - self.prevHeartbeatChangeTime) < 0.5
def _versionCheck(self) -> None:
if not _VERSION_CHECK_ENABLED:
return
if (Timer.getFPGATimestamp() - lastVersionTimeCheck) < 5.0:
return
if not self.heartbeatEntry.exists():
cameraNames = (
self.cameraTable.getInstance().getTable(self._tableName).getSubTables()
)
if len(cameraNames) == 0:
wpilib.reportError(
"Could not find any PhotonVision coprocessors on NetworkTables. Double check that PhotonVision is running, and that your camera is connected!",
False,
)
else:
wpilib.reportError(
f"PhotonVision coprocessor at path {self.path} not found in Network Tables. Double check that your camera names match! Only the following camera names were found: { ''.join(cameraNames)}",
True,
)
elif not self.isConnected():
wpilib.reportWarning(
f"PhotonVision coprocessor at path {self.path} is not sending new data.",
True,
)
versionString = self.versionEntry.get(defaultValue="")
if len(versionString) > 0 and versionString != PHOTONVISION_VERSION:
wpilib.reportWarning(
"Photon version "
+ PHOTONVISION_VERSION
+ " does not match coprocessor version "
+ versionString
+ f"! Please install photonlibpy version {PHOTONLIB_VERSION}",
True,
)

View File

@@ -0,0 +1,38 @@
from dataclasses import dataclass, field
from photonlibpy.multiTargetPNPResult import MultiTargetPNPResult
from photonlibpy.packet import Packet
from photonlibpy.photonTrackedTarget import PhotonTrackedTarget
@dataclass
class PhotonPipelineResult:
latencyMillis: float = -1.0
timestampSec: float = -1.0
targets: list[PhotonTrackedTarget] = field(default_factory=list)
multiTagResult: MultiTargetPNPResult = field(default_factory=MultiTargetPNPResult)
def populateFromPacket(self, packet: Packet) -> Packet:
self.targets = []
self.latencyMillis = packet.decodeDouble()
self.multiTagResult = MultiTargetPNPResult()
self.multiTagResult.createFromPacket(packet)
targetCount = packet.decode8()
for _ in range(targetCount):
target = PhotonTrackedTarget()
target.createFromPacket(packet)
self.targets.append(target)
return packet
def setTimestampSeconds(self, timestampSec: float) -> None:
self.timestampSec = timestampSec
def getLatencyMillis(self) -> float:
return self.latencyMillis
def getTimestamp(self) -> float:
return self.timestampSec
def getTargets(self) -> list[PhotonTrackedTarget]:
return self.targets

View File

@@ -0,0 +1,82 @@
from dataclasses import dataclass, field
from wpimath.geometry import Transform3d
from photonlibpy.packet import Packet
@dataclass
class TargetCorner:
x: float
y: float
@dataclass
class PhotonTrackedTarget:
_MAX_CORNERS = 8
_NUM_BYTES_IN_FLOAT = 8
_PACK_SIZE_BYTES = _NUM_BYTES_IN_FLOAT * (5 + 7 + 2 * 4 + 1 + 7 + 2 * _MAX_CORNERS)
yaw: float = 0.0
pitch: float = 0.0
area: float = 0.0
skew: float = 0.0
fiducialId: int = -1
bestCameraToTarget: Transform3d = field(default_factory=Transform3d)
altCameraToTarget: Transform3d = field(default_factory=Transform3d)
minAreaRectCorners: list[TargetCorner] | None = None
detectedCorners: list[TargetCorner] | None = None
poseAmbiguity: float = 0.0
def getYaw(self) -> float:
return self.yaw
def getPitch(self) -> float:
return self.pitch
def getArea(self) -> float:
return self.area
def getSkew(self) -> float:
return self.skew
def getFiducialId(self) -> int:
return self.fiducialId
def getPoseAmbiguity(self) -> float:
return self.poseAmbiguity
def getMinAreaRectCorners(self) -> list[TargetCorner] | None:
return self.minAreaRectCorners
def getDetectedCorners(self) -> list[TargetCorner] | None:
return self.detectedCorners
def getBestCameraToTarget(self) -> Transform3d:
return self.bestCameraToTarget
def getAlternateCameraToTarget(self) -> Transform3d:
return self.altCameraToTarget
def _decodeTargetList(self, packet: Packet, numTargets: int) -> list[TargetCorner]:
retList = []
for _ in range(numTargets):
cx = packet.decodeDouble()
cy = packet.decodeDouble()
retList.append(TargetCorner(cx, cy))
return retList
def createFromPacket(self, packet: Packet) -> Packet:
self.yaw = packet.decodeDouble()
self.pitch = packet.decodeDouble()
self.area = packet.decodeDouble()
self.skew = packet.decodeDouble()
self.fiducialId = packet.decode32()
self.bestCameraToTarget = packet.decodeTransform()
self.altCameraToTarget = packet.decodeTransform()
self.poseAmbiguity = packet.decodeDouble()
self.minAreaRectCorners = self._decodeTargetList(packet, 4) # always four
numCorners = packet.decode8()
self.detectedCorners = self._decodeTargetList(packet, numCorners)
return packet