mirror of
https://github.com/PhotonVision/photonvision
synced 2026-07-04 03:11:40 +00:00
Merge branch 'main' into py-docs
This commit is contained in:
@@ -1,7 +1,13 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -euo pipefail
|
||||
cd -- "$(dirname -- "$0")"
|
||||
|
||||
# Uninstall if it already was installed
|
||||
python3 -m pip uninstall -y photonlibpy
|
||||
|
||||
# Build wheel
|
||||
python3 -m pip install wheel
|
||||
python3 setup.py bdist_wheel
|
||||
|
||||
# Install whatever wheel was made
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
from enum import Enum
|
||||
from typing import List
|
||||
|
||||
import hal
|
||||
import ntcore
|
||||
|
||||
# magical import to make serde stuff work
|
||||
@@ -48,6 +49,8 @@ def setVersionCheckEnabled(enabled: bool):
|
||||
|
||||
|
||||
class PhotonCamera:
|
||||
instance_count = 1
|
||||
|
||||
def __init__(self, cameraName: str):
|
||||
"""Constructs a PhotonCamera from the name of the camera.
|
||||
|
||||
@@ -108,6 +111,13 @@ class PhotonCamera:
|
||||
# Start the time sync server
|
||||
inst.start()
|
||||
|
||||
# Usage reporting
|
||||
hal.report(
|
||||
hal.tResourceType.kResourceType_PhotonCamera.value,
|
||||
PhotonCamera.instance_count,
|
||||
)
|
||||
PhotonCamera.instance_count += 1
|
||||
|
||||
def getAllUnreadResults(self) -> List[PhotonPipelineResult]:
|
||||
"""
|
||||
The list of pipeline results sent by PhotonVision since the last call to getAllUnreadResults().
|
||||
|
||||
@@ -18,9 +18,20 @@
|
||||
import enum
|
||||
from typing import Optional
|
||||
|
||||
import hal
|
||||
import wpilib
|
||||
import wpimath.units
|
||||
from robotpy_apriltag import AprilTagFieldLayout
|
||||
from wpimath.geometry import Pose2d, Pose3d, Transform3d
|
||||
from wpimath.geometry import (
|
||||
Pose2d,
|
||||
Pose3d,
|
||||
Rotation2d,
|
||||
Rotation3d,
|
||||
Transform3d,
|
||||
Translation2d,
|
||||
Translation3d,
|
||||
)
|
||||
from wpimath.interpolation import TimeInterpolatableRotation2dBuffer
|
||||
|
||||
from .estimatedRobotPose import EstimatedRobotPose
|
||||
from .photonCamera import PhotonCamera
|
||||
@@ -59,8 +70,21 @@ class PoseStrategy(enum.Enum):
|
||||
This runs on the RoboRIO, and can take a lot of time.
|
||||
"""
|
||||
|
||||
PNP_DISTANCE_TRIG_SOLVE = enum.auto()
|
||||
"""
|
||||
Use distance data from best visible tag to compute a Pose. This runs on
|
||||
the RoboRIO in order to access the robot's yaw heading, and MUST have
|
||||
addHeadingData called every frame so heading data is up-to-date.
|
||||
|
||||
Produces a Pose2d in estimatedRobotPose (0 for z, roll, pitch).
|
||||
|
||||
See https://www.chiefdelphi.com/t/frc-6328-mechanical-advantage-2025-build-thread/477314/98
|
||||
"""
|
||||
|
||||
|
||||
class PhotonPoseEstimator:
|
||||
instance_count = 1
|
||||
|
||||
"""
|
||||
The PhotonPoseEstimator class filters or combines readings from all the AprilTags visible at a
|
||||
given timestamp on the field to produce a single robot in field pose, using the strategy set
|
||||
@@ -95,8 +119,14 @@ class PhotonPoseEstimator:
|
||||
self._poseCacheTimestampSeconds = -1.0
|
||||
self._lastPose: Optional[Pose3d] = None
|
||||
self._referencePose: Optional[Pose3d] = None
|
||||
self._headingBuffer = TimeInterpolatableRotation2dBuffer(1)
|
||||
|
||||
# TODO: Implement HAL reporting
|
||||
# Usage reporting
|
||||
hal.report(
|
||||
hal.tResourceType.kResourceType_PhotonPoseEstimator.value,
|
||||
PhotonPoseEstimator.instance_count,
|
||||
)
|
||||
PhotonPoseEstimator.instance_count += 1
|
||||
|
||||
@property
|
||||
def fieldTags(self) -> AprilTagFieldLayout:
|
||||
@@ -199,9 +229,35 @@ class PhotonPoseEstimator:
|
||||
self._poseCacheTimestampSeconds = -1.0
|
||||
|
||||
def _checkUpdate(self, oldObj, newObj) -> None:
|
||||
if oldObj != newObj and oldObj is not None and oldObj is not newObj:
|
||||
if oldObj != newObj:
|
||||
self._invalidatePoseCache()
|
||||
|
||||
def addHeadingData(
|
||||
self, timestampSeconds: wpimath.units.seconds, heading: Rotation2d | Rotation3d
|
||||
) -> None:
|
||||
"""
|
||||
Add robot heading data to buffer. Must be called periodically for the **PNP_DISTANCE_TRIG_SOLVE** strategy.
|
||||
|
||||
:param timestampSeconds :timestamp of the robot heading data
|
||||
:param heading: field-relative robot heading at given timestamp
|
||||
"""
|
||||
if isinstance(heading, Rotation3d):
|
||||
heading = heading.toRotation2d()
|
||||
self._headingBuffer.addSample(timestampSeconds, heading)
|
||||
|
||||
def resetHeadingData(
|
||||
self, timestampSeconds: wpimath.units.seconds, heading: Rotation2d | Rotation3d
|
||||
) -> None:
|
||||
"""
|
||||
Clears all heading data in the buffer, and adds a new seed. Useful for preventing estimates
|
||||
from utilizing heading data provided prior to a pose or rotation reset.
|
||||
|
||||
:param timestampSeconds: timestamp of the robot heading data
|
||||
:param heading: field-relative robot heading at given timestamp
|
||||
"""
|
||||
self._headingBuffer.clear()
|
||||
self.addHeadingData(timestampSeconds, heading)
|
||||
|
||||
def update(
|
||||
self, cameraResult: Optional[PhotonPipelineResult] = None
|
||||
) -> Optional[EstimatedRobotPose]:
|
||||
@@ -255,6 +311,8 @@ class PhotonPoseEstimator:
|
||||
estimatedPose = self._lowestAmbiguityStrategy(cameraResult)
|
||||
elif strat is PoseStrategy.MULTI_TAG_PNP_ON_COPROCESSOR:
|
||||
estimatedPose = self._multiTagOnCoprocStrategy(cameraResult)
|
||||
elif strat is PoseStrategy.PNP_DISTANCE_TRIG_SOLVE:
|
||||
estimatedPose = self._pnpDistanceTrigSolveStrategy(cameraResult)
|
||||
else:
|
||||
wpilib.reportError(
|
||||
"[PhotonPoseEstimator] Unknown Position Estimation Strategy!", False
|
||||
@@ -266,6 +324,52 @@ class PhotonPoseEstimator:
|
||||
|
||||
return estimatedPose
|
||||
|
||||
def _pnpDistanceTrigSolveStrategy(
|
||||
self, result: PhotonPipelineResult
|
||||
) -> Optional[EstimatedRobotPose]:
|
||||
if (bestTarget := result.getBestTarget()) is None:
|
||||
return None
|
||||
|
||||
if (
|
||||
headingSample := self._headingBuffer.sample(result.getTimestampSeconds())
|
||||
) is None:
|
||||
return None
|
||||
|
||||
if (tagPose := self._fieldTags.getTagPose(bestTarget.fiducialId)) is None:
|
||||
return None
|
||||
|
||||
camToTagTranslation = (
|
||||
Translation3d(
|
||||
bestTarget.getBestCameraToTarget().translation().norm(),
|
||||
Rotation3d(
|
||||
0,
|
||||
-wpimath.units.degreesToRadians(bestTarget.getPitch()),
|
||||
-wpimath.units.degreesToRadians(bestTarget.getYaw()),
|
||||
),
|
||||
)
|
||||
.rotateBy(self.robotToCamera.rotation())
|
||||
.toTranslation2d()
|
||||
.rotateBy(headingSample)
|
||||
)
|
||||
|
||||
fieldToCameraTranslation = (
|
||||
tagPose.toPose2d().translation() - camToTagTranslation
|
||||
)
|
||||
camToRobotTranslation: Translation2d = -(
|
||||
self.robotToCamera.translation().toTranslation2d()
|
||||
)
|
||||
camToRobotTranslation = camToRobotTranslation.rotateBy(headingSample)
|
||||
robotPose = Pose2d(
|
||||
fieldToCameraTranslation + camToRobotTranslation, headingSample
|
||||
)
|
||||
|
||||
return EstimatedRobotPose(
|
||||
Pose3d(robotPose),
|
||||
result.getTimestampSeconds(),
|
||||
result.getTargets(),
|
||||
PoseStrategy.PNP_DISTANCE_TRIG_SOLVE,
|
||||
)
|
||||
|
||||
def _multiTagOnCoprocStrategy(
|
||||
self, result: PhotonPipelineResult
|
||||
) -> Optional[EstimatedRobotPose]:
|
||||
|
||||
@@ -420,14 +420,15 @@ class PhotonCameraSim:
|
||||
|
||||
# put this simulated data to NT
|
||||
self.heartbeatCounter += 1
|
||||
now_micros = wpilib.Timer.getFPGATimestamp() * 1e6
|
||||
publishTimestampMicros = wpilib.Timer.getFPGATimestamp() * 1e6
|
||||
return PhotonPipelineResult(
|
||||
ntReceiveTimestampMicros=int(publishTimestampMicros + 10),
|
||||
metadata=PhotonPipelineMetadata(
|
||||
int(now_micros - latency * 1e6),
|
||||
int(now_micros),
|
||||
self.heartbeatCounter,
|
||||
captureTimestampMicros=int(publishTimestampMicros - latency * 1e6),
|
||||
publishTimestampMicros=int(publishTimestampMicros),
|
||||
sequenceID=self.heartbeatCounter,
|
||||
# Pretend like we heard a pong recently
|
||||
int(np.random.uniform(950, 1050)),
|
||||
timeSinceLastPong=int(np.random.uniform(950, 1050)),
|
||||
),
|
||||
targets=detectableTgts,
|
||||
multitagResult=multiTagResults,
|
||||
|
||||
@@ -47,13 +47,10 @@ class PhotonPipelineResult:
|
||||
timestamp, coproc timebase))
|
||||
"""
|
||||
# TODO - we don't trust NT4 to correctly latency-compensate ntReceiveTimestampMicros
|
||||
return (
|
||||
self.ntReceiveTimestampMicros
|
||||
- (
|
||||
self.metadata.publishTimestampMicros
|
||||
- self.metadata.captureTimestampMicros
|
||||
)
|
||||
) / 1e6
|
||||
latency = (
|
||||
self.metadata.publishTimestampMicros - self.metadata.captureTimestampMicros
|
||||
)
|
||||
return (self.ntReceiveTimestampMicros - latency) / 1e6
|
||||
|
||||
def getTargets(self) -> list[PhotonTrackedTarget]:
|
||||
return self.targets
|
||||
|
||||
0
photon-lib/py/test/__init__.py
Normal file
0
photon-lib/py/test/__init__.py
Normal file
@@ -15,7 +15,12 @@
|
||||
## along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
###############################################################################
|
||||
|
||||
from photonlibpy import PhotonPoseEstimator, PoseStrategy
|
||||
from test import testUtil
|
||||
|
||||
import wpimath.units
|
||||
from photonlibpy import PhotonCamera, PhotonPoseEstimator, PoseStrategy
|
||||
from photonlibpy.estimation import TargetModel
|
||||
from photonlibpy.simulation import PhotonCameraSim, SimCameraProperties, VisionTargetSim
|
||||
from photonlibpy.targeting import (
|
||||
PhotonPipelineMetadata,
|
||||
PhotonTrackedTarget,
|
||||
@@ -27,14 +32,17 @@ from robotpy_apriltag import AprilTag, AprilTagFieldLayout
|
||||
from wpimath.geometry import Pose3d, Rotation3d, Transform3d, Translation3d
|
||||
|
||||
|
||||
class PhotonCameraInjector:
|
||||
class PhotonCameraInjector(PhotonCamera):
|
||||
result: PhotonPipelineResult
|
||||
|
||||
def __init__(self, cameraName="camera"):
|
||||
super().__init__(cameraName)
|
||||
|
||||
def getLatestResult(self) -> PhotonPipelineResult:
|
||||
return self.result
|
||||
|
||||
|
||||
def setupCommon() -> AprilTagFieldLayout:
|
||||
def fakeAprilTagFieldLayout() -> AprilTagFieldLayout:
|
||||
tagList = []
|
||||
tagPoses = (
|
||||
Pose3d(3, 3, 3, Rotation3d()),
|
||||
@@ -53,8 +61,7 @@ def setupCommon() -> AprilTagFieldLayout:
|
||||
|
||||
|
||||
def test_lowestAmbiguityStrategy():
|
||||
aprilTags = setupCommon()
|
||||
|
||||
aprilTags = fakeAprilTagFieldLayout()
|
||||
cameraOne = PhotonCameraInjector()
|
||||
cameraOne.result = PhotonPipelineResult(
|
||||
int(11 * 1e6),
|
||||
@@ -146,6 +153,86 @@ def test_lowestAmbiguityStrategy():
|
||||
assertEquals(2, pose.z, 0.01)
|
||||
|
||||
|
||||
def test_pnpDistanceTrigSolve():
|
||||
aprilTags = fakeAprilTagFieldLayout()
|
||||
cameraOne = PhotonCameraInjector()
|
||||
latencySecs: wpimath.units.seconds = 1
|
||||
fakeTimestampSecs: wpimath.units.seconds = 9 + latencySecs
|
||||
|
||||
cameraOneSim = PhotonCameraSim(cameraOne, SimCameraProperties.PERFECT_90DEG())
|
||||
simTargets = [
|
||||
VisionTargetSim(tag.pose, TargetModel.AprilTag36h11(), tag.ID)
|
||||
for tag in aprilTags.getTags()
|
||||
]
|
||||
|
||||
# Compound Rolled + Pitched + Yaw
|
||||
compoundTestTransform = Transform3d(
|
||||
-wpimath.units.inchesToMeters(12),
|
||||
-wpimath.units.inchesToMeters(11),
|
||||
3,
|
||||
Rotation3d(
|
||||
wpimath.units.degreesToRadians(37),
|
||||
wpimath.units.degreesToRadians(6),
|
||||
wpimath.units.degreesToRadians(60),
|
||||
),
|
||||
)
|
||||
|
||||
estimator = PhotonPoseEstimator(
|
||||
aprilTags,
|
||||
PoseStrategy.PNP_DISTANCE_TRIG_SOLVE,
|
||||
cameraOne,
|
||||
compoundTestTransform,
|
||||
)
|
||||
|
||||
realPose = Pose3d(7.3, 4.42, 0, Rotation3d(0, 0, 2.197)) # Pose to compare with
|
||||
result = cameraOneSim.process(
|
||||
latencySecs, realPose.transformBy(estimator.robotToCamera), simTargets
|
||||
)
|
||||
bestTarget = result.getBestTarget()
|
||||
assert bestTarget is not None
|
||||
assert bestTarget.fiducialId == 0
|
||||
assert result.ntReceiveTimestampMicros > 0
|
||||
# Make test independent of the FPGA time.
|
||||
result.ntReceiveTimestampMicros = int(fakeTimestampSecs * 1e6)
|
||||
|
||||
estimator.addHeadingData(
|
||||
result.getTimestampSeconds(), realPose.rotation().toRotation2d()
|
||||
)
|
||||
estimatedRobotPose = estimator.update(result)
|
||||
|
||||
assert estimatedRobotPose is not None
|
||||
pose = estimatedRobotPose.estimatedPose
|
||||
assertEquals(realPose.x, pose.x, 0.01)
|
||||
assertEquals(realPose.y, pose.y, 0.01)
|
||||
assertEquals(0.0, pose.z, 0.01)
|
||||
|
||||
# Straight on
|
||||
fakeTimestampSecs += 60
|
||||
straightOnTestTransform = Transform3d(0, 0, 3, Rotation3d())
|
||||
estimator.robotToCamera = straightOnTestTransform
|
||||
realPose = Pose3d(4.81, 2.38, 0, Rotation3d(0, 0, 2.818)) # Pose to compare with
|
||||
result = cameraOneSim.process(
|
||||
latencySecs, realPose.transformBy(estimator.robotToCamera), simTargets
|
||||
)
|
||||
bestTarget = result.getBestTarget()
|
||||
assert bestTarget is not None
|
||||
assert bestTarget.fiducialId == 0
|
||||
assert result.ntReceiveTimestampMicros > 0
|
||||
# Make test independent of the FPGA time.
|
||||
result.ntReceiveTimestampMicros = int(fakeTimestampSecs * 1e6)
|
||||
|
||||
estimator.addHeadingData(
|
||||
result.getTimestampSeconds(), realPose.rotation().toRotation2d()
|
||||
)
|
||||
estimatedRobotPose = estimator.update(result)
|
||||
|
||||
assert estimatedRobotPose is not None
|
||||
pose = estimatedRobotPose.estimatedPose
|
||||
assertEquals(realPose.x, pose.x, 0.01)
|
||||
assertEquals(realPose.y, pose.y, 0.01)
|
||||
assertEquals(0.0, pose.z, 0.01)
|
||||
|
||||
|
||||
def test_multiTagOnCoprocStrategy():
|
||||
cameraOne = PhotonCameraInjector()
|
||||
cameraOne.result = PhotonPipelineResult(
|
||||
@@ -202,11 +289,38 @@ def test_multiTagOnCoprocStrategy():
|
||||
|
||||
|
||||
def test_cacheIsInvalidated():
|
||||
aprilTags = setupCommon()
|
||||
|
||||
aprilTags = fakeAprilTagFieldLayout()
|
||||
cameraOne = PhotonCameraInjector()
|
||||
|
||||
estimator = PhotonPoseEstimator(
|
||||
aprilTags, PoseStrategy.LOWEST_AMBIGUITY, cameraOne, Transform3d()
|
||||
)
|
||||
|
||||
# Initial state, expect no timestamp.
|
||||
assertEquals(-1, estimator._poseCacheTimestampSeconds)
|
||||
|
||||
# First result is 17s after epoch start.
|
||||
timestamps = testUtil.PipelineTimestamps(captureTimestampMicros=17_000_000)
|
||||
latencySecs = timestamps.pipelineLatencySecs()
|
||||
|
||||
# No targets, expect empty result
|
||||
cameraOne.result = PhotonPipelineResult(
|
||||
timestamps.receiveTimestampMicros(),
|
||||
metadata=timestamps.toPhotonPipelineMetadata(),
|
||||
)
|
||||
estimatedPose = estimator.update()
|
||||
|
||||
assert estimatedPose is None
|
||||
assertEquals(
|
||||
timestamps.receiveTimestampMicros() * 1e-6 - latencySecs,
|
||||
estimator._poseCacheTimestampSeconds,
|
||||
1e-3,
|
||||
)
|
||||
|
||||
# Set actual result
|
||||
timestamps.incrementTimeMicros(2_500_000)
|
||||
result = PhotonPipelineResult(
|
||||
int(20 * 1e6),
|
||||
timestamps.receiveTimestampMicros(),
|
||||
[
|
||||
PhotonTrackedTarget(
|
||||
3.0,
|
||||
@@ -231,31 +345,21 @@ def test_cacheIsInvalidated():
|
||||
0.7,
|
||||
)
|
||||
],
|
||||
metadata=PhotonPipelineMetadata(0, int(2 * 1e3), 0),
|
||||
metadata=timestamps.toPhotonPipelineMetadata(),
|
||||
)
|
||||
|
||||
estimator = PhotonPoseEstimator(
|
||||
aprilTags, PoseStrategy.LOWEST_AMBIGUITY, cameraOne, Transform3d()
|
||||
)
|
||||
|
||||
# Empty result, expect empty result
|
||||
cameraOne.result = PhotonPipelineResult(0)
|
||||
estimatedPose = estimator.update()
|
||||
assert estimatedPose is None
|
||||
|
||||
# Set actual result
|
||||
cameraOne.result = result
|
||||
estimatedPose = estimator.update()
|
||||
assert estimatedPose is not None
|
||||
assertEquals(20, estimatedPose.timestampSeconds, 0.01)
|
||||
assertEquals(20 - 2e-3, estimator._poseCacheTimestampSeconds, 1e-3)
|
||||
expectedTimestamp = timestamps.receiveTimestampMicros() * 1e-6 - latencySecs
|
||||
assertEquals(expectedTimestamp, estimatedPose.timestampSeconds, 1e-3)
|
||||
assertEquals(expectedTimestamp, estimator._poseCacheTimestampSeconds, 1e-3)
|
||||
|
||||
# And again -- pose cache should mean this is empty
|
||||
cameraOne.result = result
|
||||
estimatedPose = estimator.update()
|
||||
assert estimatedPose is None
|
||||
# Expect the old timestamp to still be here
|
||||
assertEquals(20 - 2e-3, estimator._poseCacheTimestampSeconds, 1e-3)
|
||||
assertEquals(expectedTimestamp, estimator._poseCacheTimestampSeconds, 1e-3)
|
||||
|
||||
# Set new field layout -- right after, the pose cache timestamp should be -1
|
||||
estimator.fieldTags = AprilTagFieldLayout([AprilTag()], 0, 0)
|
||||
@@ -266,8 +370,14 @@ def test_cacheIsInvalidated():
|
||||
|
||||
assert estimatedPose is not None
|
||||
|
||||
assertEquals(20, estimatedPose.timestampSeconds, 0.01)
|
||||
assertEquals(20 - 2e-3, estimator._poseCacheTimestampSeconds, 1e-3)
|
||||
assertEquals(expectedTimestamp, estimatedPose.timestampSeconds, 1e-3)
|
||||
assertEquals(expectedTimestamp, estimator._poseCacheTimestampSeconds, 1e-3)
|
||||
|
||||
# Setting a value from None to a non-None should invalidate the cache.
|
||||
assert estimator.referencePose is None
|
||||
estimator.referencePose = Pose3d(3, 3, 3, Rotation3d())
|
||||
|
||||
assertEquals(-1, estimator._poseCacheTimestampSeconds)
|
||||
|
||||
|
||||
def assertEquals(expected, actual, epsilon=0.0):
|
||||
|
||||
65
photon-lib/py/test/testUtil.py
Normal file
65
photon-lib/py/test/testUtil.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Test utilities."""
|
||||
|
||||
from photonlibpy.targeting import PhotonPipelineMetadata
|
||||
|
||||
|
||||
class InvalidTestDataException(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class PipelineTimestamps:
|
||||
"""Helper class to ensure timestamps are positive."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
captureTimestampMicros: int,
|
||||
pipelineLatencyMicros=2_000,
|
||||
receiveLatencyMicros=1_000,
|
||||
):
|
||||
if captureTimestampMicros < 0:
|
||||
raise InvalidTestDataException("captureTimestampMicros cannot be negative")
|
||||
if pipelineLatencyMicros <= 0:
|
||||
raise InvalidTestDataException("pipelineLatencyMicros must be positive")
|
||||
if receiveLatencyMicros < 0:
|
||||
raise InvalidTestDataException("receiveLatencyMicros cannot be negative")
|
||||
self._captureTimestampMicros = captureTimestampMicros
|
||||
self._pipelineLatencyMicros = pipelineLatencyMicros
|
||||
self._receiveLatencyMicros = receiveLatencyMicros
|
||||
self._sequenceID = 0
|
||||
|
||||
@property
|
||||
def captureTimestampMicros(self) -> int:
|
||||
return self._captureTimestampMicros
|
||||
|
||||
@captureTimestampMicros.setter
|
||||
def captureTimestampMicros(self, micros: int) -> None:
|
||||
if micros < 0:
|
||||
raise InvalidTestDataException("captureTimestampMicros cannot be negative")
|
||||
if micros < self._captureTimestampMicros:
|
||||
raise InvalidTestDataException("time cannot go backwards")
|
||||
self._captureTimestampMicros = micros
|
||||
self._sequenceID += 1
|
||||
|
||||
@property
|
||||
def pipelineLatencyMicros(self) -> int:
|
||||
return self._pipelineLatencyMicros
|
||||
|
||||
def pipelineLatencySecs(self) -> float:
|
||||
return self.pipelineLatencyMicros * 1e-6
|
||||
|
||||
def incrementTimeMicros(self, micros: int) -> None:
|
||||
self.captureTimestampMicros += micros
|
||||
|
||||
def publishTimestampMicros(self) -> int:
|
||||
return self._captureTimestampMicros + self.pipelineLatencyMicros
|
||||
|
||||
def receiveTimestampMicros(self) -> int:
|
||||
return self.publishTimestampMicros() + self._receiveLatencyMicros
|
||||
|
||||
def toPhotonPipelineMetadata(self) -> PhotonPipelineMetadata:
|
||||
return PhotonPipelineMetadata(
|
||||
captureTimestampMicros=self.captureTimestampMicros,
|
||||
publishTimestampMicros=self.publishTimestampMicros(),
|
||||
sequenceID=self._sequenceID,
|
||||
)
|
||||
Reference in New Issue
Block a user