mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-19 00:41:43 +00:00
[copybara] Resync commandsv2 (#8535)
This will be the last sync from the commandsv2 [repo](https://github.com/robotpy/robotpy-commands-v2), as it has now been merged into [mostrobotpy](https://github.com/robotpy/mostrobotpy/pull/226). Doing this on its own to make the diffs smaller when doing a full `mostrobotpy` sync, which will now have a lot of other new goodies coming along for the ride. --------- Co-authored-by: Default email <default@default.com>
This commit is contained in:
@@ -14,11 +14,7 @@ from .notifiercommand import NotifierCommand
|
||||
from .parallelcommandgroup import ParallelCommandGroup
|
||||
from .paralleldeadlinegroup import ParallelDeadlineGroup
|
||||
from .parallelracegroup import ParallelRaceGroup
|
||||
from .pidcommand import PIDCommand
|
||||
from .pidsubsystem import PIDSubsystem
|
||||
from .printcommand import PrintCommand
|
||||
from .profiledpidcommand import ProfiledPIDCommand
|
||||
from .profiledpidsubsystem import ProfiledPIDSubsystem
|
||||
from .proxycommand import ProxyCommand
|
||||
from .repeatcommand import RepeatCommand
|
||||
from .runcommand import RunCommand
|
||||
@@ -28,8 +24,6 @@ from .sequentialcommandgroup import SequentialCommandGroup
|
||||
from .startendcommand import StartEndCommand
|
||||
from .subsystem import Subsystem
|
||||
from .timedcommandrobot import TimedCommandRobot
|
||||
from .trapezoidprofilecommand import TrapezoidProfileCommand
|
||||
from .trapezoidprofilesubsystem import TrapezoidProfileSubsystem
|
||||
from .waitcommand import WaitCommand
|
||||
from .waituntilcommand import WaitUntilCommand
|
||||
from .wrappercommand import WrapperCommand
|
||||
@@ -51,11 +45,7 @@ __all__ = [
|
||||
"ParallelCommandGroup",
|
||||
"ParallelDeadlineGroup",
|
||||
"ParallelRaceGroup",
|
||||
"PIDCommand",
|
||||
"PIDSubsystem",
|
||||
"PrintCommand",
|
||||
"ProfiledPIDCommand",
|
||||
"ProfiledPIDSubsystem",
|
||||
"ProxyCommand",
|
||||
"RepeatCommand",
|
||||
"RunCommand",
|
||||
@@ -65,8 +55,6 @@ __all__ = [
|
||||
"StartEndCommand",
|
||||
"Subsystem",
|
||||
"TimedCommandRobot",
|
||||
"TrapezoidProfileCommand",
|
||||
"TrapezoidProfileSubsystem",
|
||||
"WaitCommand",
|
||||
"WaitUntilCommand",
|
||||
"WrapperCommand",
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
# validated: 2024-01-19 DS f29a7d2e501b PIDCommand.java
|
||||
# Copyright (c) FIRST and other WPILib contributors.
|
||||
# Open Source Software; you can modify and/or share it under the terms of
|
||||
# the WPILib BSD license file in the root directory of this project.
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Callable, Union
|
||||
|
||||
from .command import Command
|
||||
from .subsystem import Subsystem
|
||||
|
||||
from wpimath import PIDController
|
||||
|
||||
|
||||
class PIDCommand(Command):
|
||||
"""
|
||||
A command that controls an output with a :class:`wpimath.controller.PIDController`. Runs forever by default - to add
|
||||
exit conditions and/or other behavior, subclass this class. The controller calculation and output
|
||||
are performed synchronously in the command's execute() method.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
controller: PIDController,
|
||||
measurementSource: Callable[[], float],
|
||||
setpoint: Union[Callable[[], float], float, int],
|
||||
useOutput: Callable[[float], Any],
|
||||
*requirements: Subsystem,
|
||||
):
|
||||
"""
|
||||
Creates a new PIDCommand, which controls the given output with a :class:`wpimath.controller.PIDController`.
|
||||
|
||||
:param controller: the controller that controls the output.
|
||||
:param measurementSource: the measurement of the process variable
|
||||
:param setpoint: the controller's setpoint (either a function that returns a)
|
||||
number or a number
|
||||
:param useOutput: the controller's output
|
||||
:param requirements: the subsystems required by this command
|
||||
"""
|
||||
super().__init__()
|
||||
|
||||
assert callable(measurementSource)
|
||||
assert callable(useOutput)
|
||||
|
||||
self._controller = controller
|
||||
self._useOutput = useOutput
|
||||
self._measurement = measurementSource
|
||||
|
||||
if isinstance(setpoint, (float, int)):
|
||||
setpoint = float(setpoint)
|
||||
self._setpoint = lambda: setpoint
|
||||
elif callable(setpoint):
|
||||
self._setpoint = setpoint
|
||||
else:
|
||||
raise ValueError(
|
||||
f"invalid setpoint (must be callable or number; got {type(setpoint)})"
|
||||
)
|
||||
|
||||
self.addRequirements(*requirements)
|
||||
|
||||
def initialize(self):
|
||||
self._controller.reset()
|
||||
|
||||
def execute(self):
|
||||
self._useOutput(
|
||||
self._controller.calculate(self._measurement(), self._setpoint())
|
||||
)
|
||||
|
||||
def end(self, interrupted):
|
||||
self._useOutput(0)
|
||||
|
||||
def getController(self):
|
||||
"""
|
||||
Returns the PIDController used by the command.
|
||||
|
||||
:return: The PIDController
|
||||
"""
|
||||
return self._controller
|
||||
@@ -1,100 +0,0 @@
|
||||
# validated: 2024-01-19 DS f29a7d2e501b PIDSubsystem.java
|
||||
# Copyright (c) FIRST and other WPILib contributors.
|
||||
# Open Source Software; you can modify and/or share it under the terms of
|
||||
# the WPILib BSD license file in the root directory of this project.
|
||||
from __future__ import annotations
|
||||
|
||||
from wpimath import PIDController
|
||||
|
||||
from .subsystem import Subsystem
|
||||
|
||||
|
||||
class PIDSubsystem(Subsystem):
|
||||
"""
|
||||
A subsystem that uses a :class:`wpimath.controller.PIDController` to control an output. The
|
||||
controller is run synchronously from the subsystem's periodic() method.
|
||||
"""
|
||||
|
||||
def __init__(self, controller: PIDController, initial_position: float = 0.0):
|
||||
"""
|
||||
Creates a new PIDSubsystem.
|
||||
|
||||
:param controller: The PIDController to use.
|
||||
:param initial_position: The initial setpoint of the subsystem.
|
||||
"""
|
||||
super().__init__()
|
||||
|
||||
self._controller = controller
|
||||
self.setSetpoint(initial_position)
|
||||
self.addChild("PID Controller", self._controller)
|
||||
self._enabled = False
|
||||
|
||||
def periodic(self):
|
||||
"""
|
||||
Executes the PID control logic during each periodic update.
|
||||
|
||||
This method is called synchronously from the subsystem's periodic() method.
|
||||
"""
|
||||
if self._enabled:
|
||||
self.useOutput(
|
||||
self._controller.calculate(self.getMeasurement()), self.getSetpoint()
|
||||
)
|
||||
|
||||
def getController(self) -> PIDController:
|
||||
"""
|
||||
Returns the PIDController used by the subsystem.
|
||||
|
||||
:return: The PIDController.
|
||||
"""
|
||||
return self._controller
|
||||
|
||||
def setSetpoint(self, setpoint: float):
|
||||
"""
|
||||
Sets the setpoint for the subsystem.
|
||||
|
||||
:param setpoint: The setpoint for the subsystem.
|
||||
"""
|
||||
self._controller.setSetpoint(setpoint)
|
||||
|
||||
def getSetpoint(self) -> float:
|
||||
"""
|
||||
Returns the current setpoint of the subsystem.
|
||||
|
||||
:return: The current setpoint.
|
||||
"""
|
||||
return self._controller.getSetpoint()
|
||||
|
||||
def useOutput(self, output: float, setpoint: float):
|
||||
"""
|
||||
Uses the output from the PIDController.
|
||||
|
||||
:param output: The output of the PIDController.
|
||||
:param setpoint: The setpoint of the PIDController (for feedforward).
|
||||
"""
|
||||
raise NotImplementedError(f"{self.__class__} must implement useOutput")
|
||||
|
||||
def getMeasurement(self) -> float:
|
||||
"""
|
||||
Returns the measurement of the process variable used by the PIDController.
|
||||
|
||||
:return: The measurement of the process variable.
|
||||
"""
|
||||
raise NotImplementedError(f"{self.__class__} must implement getMeasurement")
|
||||
|
||||
def enable(self):
|
||||
"""Enables the PID control. Resets the controller."""
|
||||
self._enabled = True
|
||||
self._controller.reset()
|
||||
|
||||
def disable(self):
|
||||
"""Disables the PID control. Sets output to zero."""
|
||||
self._enabled = False
|
||||
self.useOutput(0, 0)
|
||||
|
||||
def isEnabled(self) -> bool:
|
||||
"""
|
||||
Returns whether the controller is enabled.
|
||||
|
||||
:return: Whether the controller is enabled.
|
||||
"""
|
||||
return self._enabled
|
||||
@@ -1,80 +0,0 @@
|
||||
# validated: 2024-01-24 DS f29a7d2e501b ProfiledPIDCommand.java
|
||||
#
|
||||
# Copyright (c) FIRST and other WPILib contributors.
|
||||
# Open Source Software; you can modify and/or share it under the terms of
|
||||
# the WPILib BSD license file in the root directory of this project.
|
||||
#
|
||||
|
||||
from typing import Any, Generic
|
||||
|
||||
from wpimath import ProfiledPIDController, ProfiledPIDControllerRadians, TrapezoidProfile, TrapezoidProfileRadians
|
||||
|
||||
from .command import Command
|
||||
from .subsystem import Subsystem
|
||||
from .typing import (
|
||||
FloatOrFloatSupplier,
|
||||
FloatSupplier,
|
||||
TProfiledPIDController,
|
||||
UseOutputFunction,
|
||||
)
|
||||
|
||||
|
||||
class ProfiledPIDCommand(Command, Generic[TProfiledPIDController]):
|
||||
"""A command that controls an output with a :class:`.ProfiledPIDController`. Runs forever by default -
|
||||
to add exit conditions and/or other behavior, subclass this class. The controller calculation and
|
||||
output are performed synchronously in the command's execute() method.
|
||||
"""
|
||||
|
||||
_stateCls: Any
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
controller: TProfiledPIDController,
|
||||
measurementSource: FloatSupplier,
|
||||
goalSource: FloatOrFloatSupplier,
|
||||
useOutput: UseOutputFunction,
|
||||
*requirements: Subsystem,
|
||||
):
|
||||
"""Creates a new ProfiledPIDCommand, which controls the given output with a ProfiledPIDController. Goal
|
||||
velocity is specified.
|
||||
|
||||
:param controller: the controller that controls the output.
|
||||
:param measurementSource: the measurement of the process variable
|
||||
:param goalSource: the controller's goal
|
||||
:param useOutput: the controller's output
|
||||
:param requirements: the subsystems required by this command
|
||||
"""
|
||||
|
||||
super().__init__()
|
||||
if isinstance(controller, ProfiledPIDController):
|
||||
self._stateCls = TrapezoidProfile.State
|
||||
elif isinstance(controller, ProfiledPIDControllerRadians):
|
||||
self._stateCls = TrapezoidProfileRadians.State
|
||||
else:
|
||||
raise ValueError(f"unknown controller type {controller!r}")
|
||||
|
||||
self._controller: TProfiledPIDController = controller
|
||||
self._useOutput = useOutput
|
||||
self._measurement = measurementSource
|
||||
if isinstance(goalSource, (float, int)):
|
||||
self._goal = lambda: float(goalSource)
|
||||
else:
|
||||
self._goal = goalSource
|
||||
|
||||
self.addRequirements(*requirements)
|
||||
|
||||
def initialize(self):
|
||||
self._controller.reset(self._measurement())
|
||||
|
||||
def execute(self):
|
||||
self._useOutput.accept(
|
||||
self._controller.calculate(self._measurement(), self._goal()),
|
||||
self._controller.getSetpoint(),
|
||||
)
|
||||
|
||||
def end(self, interrupted: bool):
|
||||
self._useOutput(0.0, self._stateCls())
|
||||
|
||||
def getController(self):
|
||||
"""Gets the controller used by the command"""
|
||||
return self._controller
|
||||
@@ -1,81 +0,0 @@
|
||||
# Copyright (c) FIRST and other WPILib contributors.
|
||||
# Open Source Software; you can modify and/or share it under the terms of
|
||||
# the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
from typing import Generic
|
||||
|
||||
from wpimath import TrapezoidProfile
|
||||
|
||||
from .subsystem import Subsystem
|
||||
from .typing import TProfiledPIDController, TTrapezoidProfileState
|
||||
|
||||
|
||||
class ProfiledPIDSubsystem(
|
||||
Subsystem, Generic[TProfiledPIDController, TTrapezoidProfileState]
|
||||
):
|
||||
"""
|
||||
A subsystem that uses a :class:`wpimath.controller.ProfiledPIDController`
|
||||
or :class:`wpimath.controller.ProfiledPIDControllerRadians` to
|
||||
control an output. The controller is run synchronously from the subsystem's
|
||||
:meth:`.periodic` method.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
controller: TProfiledPIDController,
|
||||
initial_position: float = 0,
|
||||
):
|
||||
"""
|
||||
Creates a new Profiled PID Subsystem using the provided PID Controller
|
||||
|
||||
:param controller: the controller that controls the output
|
||||
:param initial_position: the initial value of the process variable
|
||||
|
||||
"""
|
||||
super().__init__()
|
||||
self._controller: TProfiledPIDController = controller
|
||||
self._enabled = False
|
||||
self.setGoal(initial_position)
|
||||
|
||||
def periodic(self):
|
||||
"""Updates the output of the controller."""
|
||||
if self._enabled:
|
||||
self.useOutput(
|
||||
self._controller.calculate(self.getMeasurement()),
|
||||
self._controller.getSetpoint(),
|
||||
)
|
||||
|
||||
def getController(
|
||||
self,
|
||||
) -> TProfiledPIDController:
|
||||
"""Returns the controller"""
|
||||
return self._controller
|
||||
|
||||
def setGoal(self, goal):
|
||||
"""Sets the goal state for the subsystem."""
|
||||
self._controller.setGoal(goal)
|
||||
|
||||
def useOutput(self, output: float, setpoint: TTrapezoidProfileState):
|
||||
"""Uses the output from the controller object."""
|
||||
raise NotImplementedError(f"{self.__class__} must implement useOutput")
|
||||
|
||||
def getMeasurement(self) -> float:
|
||||
"""
|
||||
Returns the measurement of the process variable used by the
|
||||
controller object.
|
||||
"""
|
||||
raise NotImplementedError(f"{self.__class__} must implement getMeasurement")
|
||||
|
||||
def enable(self):
|
||||
"""Enables the PID control. Resets the controller."""
|
||||
self._enabled = True
|
||||
self._controller.reset(self.getMeasurement())
|
||||
|
||||
def disable(self):
|
||||
"""Disables the PID control. Sets output to zero."""
|
||||
self._enabled = False
|
||||
self.useOutput(0, TrapezoidProfile.State())
|
||||
|
||||
def isEnabled(self) -> bool:
|
||||
"""Returns whether the controller is enabled."""
|
||||
return self._enabled
|
||||
@@ -1,64 +0,0 @@
|
||||
# validated: 2024-01-24 DS 192a28af4731 TrapezoidProfileCommand.java
|
||||
#
|
||||
# Copyright (c) FIRST and other WPILib contributors.
|
||||
# Open Source Software; you can modify and/or share it under the terms of
|
||||
# the WPILib BSD license file in the root directory of this project.
|
||||
#
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Callable, Any
|
||||
|
||||
from wpilib import Timer
|
||||
|
||||
from .command import Command
|
||||
from .subsystem import Subsystem
|
||||
|
||||
|
||||
class TrapezoidProfileCommand(Command):
|
||||
"""
|
||||
A command that runs a :class:`wpimath.trajectory.TrapezoidProfile`. Useful for smoothly controlling mechanism motion.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
profile,
|
||||
output: Callable[[Any], Any],
|
||||
goal: Callable[[], Any],
|
||||
currentState: Callable[[], Any],
|
||||
*requirements: Subsystem,
|
||||
):
|
||||
"""Creates a new TrapezoidProfileCommand that will execute the given :class:`wpimath.trajectory.TrapezoidProfile`.
|
||||
Output will be piped to the provided consumer function.
|
||||
|
||||
:param profile: The motion profile to execute.
|
||||
:param output: The consumer for the profile output.
|
||||
:param goal: The supplier for the desired state
|
||||
:param currentState: The supplier for the current state
|
||||
:param requirements: The subsystems required by this command.
|
||||
"""
|
||||
super().__init__()
|
||||
self._profile = profile
|
||||
self._output = output
|
||||
self._goal = goal
|
||||
self._currentState = currentState
|
||||
self._timer = Timer()
|
||||
|
||||
self.addRequirements(*requirements)
|
||||
|
||||
def initialize(self) -> None:
|
||||
self._timer.restart()
|
||||
|
||||
def execute(self) -> None:
|
||||
self._output(
|
||||
self._profile.calculate(
|
||||
self._timer.get(),
|
||||
self._currentState(),
|
||||
self._goal(),
|
||||
)
|
||||
)
|
||||
|
||||
def end(self, interrupted) -> None:
|
||||
self._timer.stop()
|
||||
|
||||
def isFinished(self) -> bool:
|
||||
return self._timer.hasElapsed(self._profile.totalTime())
|
||||
@@ -1,93 +0,0 @@
|
||||
# validated: 2024-02-20 DV 6cc7e52de74a TrapezoidProfileSubsystem.java
|
||||
#
|
||||
# Copyright (c) FIRST and other WPILib contributors.
|
||||
# Open Source Software; you can modify and/or share it under the terms of
|
||||
# the WPILib BSD license file in the root directory of this project.
|
||||
#
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Union
|
||||
|
||||
from wpimath import TrapezoidProfile, TrapezoidProfileRadians, units
|
||||
|
||||
|
||||
from .subsystem import Subsystem
|
||||
|
||||
|
||||
class TrapezoidProfileSubsystem(Subsystem):
|
||||
"""
|
||||
A subsystem that generates and runs trapezoidal motion profiles automatically. The user specifies
|
||||
how to use the current state of the motion profile by overriding the `useState` method.
|
||||
"""
|
||||
|
||||
_profile: Any
|
||||
_stateCls: Any
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
constraints: Union[
|
||||
TrapezoidProfile.Constraints, TrapezoidProfileRadians.Constraints
|
||||
],
|
||||
initial_position: float = 0.0,
|
||||
period: units.seconds = 0.02,
|
||||
):
|
||||
"""
|
||||
Creates a new TrapezoidProfileSubsystem.
|
||||
|
||||
:param constraints: The constraints (maximum velocity and acceleration) for the profiles.
|
||||
:param initial_position: The initial position of the controlled mechanism when the subsystem is constructed.
|
||||
:param period: The period of the main robot loop, in seconds.
|
||||
"""
|
||||
if isinstance(constraints, TrapezoidProfile.Constraints):
|
||||
self._profile = TrapezoidProfile(constraints)
|
||||
self._stateCls = TrapezoidProfile.State
|
||||
elif isinstance(constraints, TrapezoidProfileRadians.Constraints):
|
||||
self._profile = TrapezoidProfileRadians(constraints)
|
||||
self._stateCls = TrapezoidProfileRadians.State
|
||||
else:
|
||||
raise ValueError(f"Invalid constraints {constraints}")
|
||||
|
||||
self._state = self._stateCls(initial_position, 0)
|
||||
self.setGoal(initial_position)
|
||||
self._period = period
|
||||
self._enabled = True
|
||||
|
||||
def periodic(self):
|
||||
"""
|
||||
Executes the TrapezoidProfileSubsystem logic during each periodic update.
|
||||
|
||||
This method is called synchronously from the subsystem's periodic() method.
|
||||
"""
|
||||
self._state = self._profile.calculate(self._period, self._state, self._goal)
|
||||
if self._enabled:
|
||||
self.useState(self._state)
|
||||
|
||||
def setGoal(self, goal):
|
||||
"""
|
||||
Sets the goal state for the subsystem. Goal velocity assumed to be zero.
|
||||
|
||||
:param goal: The goal position for the subsystem's motion profile. The goal
|
||||
can either be a `TrapezoidProfile.State` or `float`. If float is provided,
|
||||
the assumed velocity for the goal will be 0.
|
||||
"""
|
||||
# If we got a float, instantiate the state
|
||||
if isinstance(goal, (float, int)):
|
||||
goal = self._stateCls(goal, 0)
|
||||
|
||||
self._goal = goal
|
||||
|
||||
def enable(self):
|
||||
"""Enable the TrapezoidProfileSubsystem's output."""
|
||||
self._enabled = True
|
||||
|
||||
def disable(self):
|
||||
"""Disable the TrapezoidProfileSubsystem's output."""
|
||||
self._enabled = False
|
||||
|
||||
def useState(self, state):
|
||||
"""
|
||||
Users should override this to consume the current state of the motion profile.
|
||||
|
||||
:param state: The current state of the motion profile.
|
||||
"""
|
||||
raise NotImplementedError(f"{self.__class__} must implement useState")
|
||||
@@ -1,28 +1,6 @@
|
||||
from typing import Callable, Protocol, TypeVar, Union
|
||||
from typing import Callable, Union
|
||||
|
||||
from typing_extensions import TypeAlias
|
||||
from wpimath import ProfiledPIDController, ProfiledPIDControllerRadians, TrapezoidProfile, TrapezoidProfileRadians
|
||||
|
||||
# Generic Types
|
||||
TProfiledPIDController = TypeVar(
|
||||
"TProfiledPIDController", ProfiledPIDControllerRadians, ProfiledPIDController
|
||||
)
|
||||
TTrapezoidProfileState = TypeVar(
|
||||
"TTrapezoidProfileState",
|
||||
TrapezoidProfileRadians.State,
|
||||
TrapezoidProfile.State,
|
||||
)
|
||||
|
||||
|
||||
# Protocols - Structural Typing
|
||||
class UseOutputFunction(Protocol):
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None: ...
|
||||
|
||||
def __call__(self, t: float, u: TTrapezoidProfileState) -> None: ...
|
||||
|
||||
def accept(self, t: float, u: TTrapezoidProfileState) -> None: ...
|
||||
|
||||
|
||||
# Type Aliases
|
||||
FloatSupplier: TypeAlias = Callable[[], float]
|
||||
|
||||
@@ -1,115 +0,0 @@
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from util import * # type: ignore
|
||||
import commands2
|
||||
|
||||
import wpimath
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .util import *
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_pidCommandSupplier(scheduler: commands2.CommandScheduler):
|
||||
with ManualSimTime() as sim:
|
||||
output_float = OOFloat(0.0)
|
||||
measurement_source = OOFloat(5.0)
|
||||
setpoint_source = OOFloat(2.0)
|
||||
pid_controller = wpimath.PIDController(0.1, 0.01, 0.001)
|
||||
system = commands2.Subsystem()
|
||||
pidCommand = commands2.PIDCommand(
|
||||
pid_controller,
|
||||
measurement_source,
|
||||
setpoint_source,
|
||||
output_float.set,
|
||||
system,
|
||||
)
|
||||
start_spying_on(pidCommand)
|
||||
scheduler.schedule(pidCommand)
|
||||
scheduler.run()
|
||||
sim.step(1)
|
||||
scheduler.run()
|
||||
|
||||
assert scheduler.isScheduled(pidCommand)
|
||||
|
||||
assert not pidCommand._controller.atSetpoint()
|
||||
|
||||
# Tell the pid command we're at our setpoint through the controller
|
||||
measurement_source.set(setpoint_source())
|
||||
|
||||
sim.step(2)
|
||||
|
||||
scheduler.run()
|
||||
|
||||
# Should be measuring error of 0 now
|
||||
assert pidCommand._controller.atSetpoint()
|
||||
|
||||
|
||||
def test_pidCommandScalar(scheduler: commands2.CommandScheduler):
|
||||
with ManualSimTime() as sim:
|
||||
output_float = OOFloat(0.0)
|
||||
measurement_source = OOFloat(5.0)
|
||||
setpoint_source = 2.0
|
||||
pid_controller = wpimath.PIDController(0.1, 0.01, 0.001)
|
||||
system = commands2.Subsystem()
|
||||
pidCommand = commands2.PIDCommand(
|
||||
pid_controller,
|
||||
measurement_source,
|
||||
setpoint_source,
|
||||
output_float.set,
|
||||
system,
|
||||
)
|
||||
start_spying_on(pidCommand)
|
||||
scheduler.schedule(pidCommand)
|
||||
scheduler.run()
|
||||
sim.step(1)
|
||||
scheduler.run()
|
||||
|
||||
assert scheduler.isScheduled(pidCommand)
|
||||
|
||||
assert not pidCommand._controller.atSetpoint()
|
||||
|
||||
# Tell the pid command we're at our setpoint through the controller
|
||||
measurement_source.set(setpoint_source)
|
||||
|
||||
sim.step(2)
|
||||
|
||||
scheduler.run()
|
||||
|
||||
# Should be measuring error of 0 now
|
||||
assert pidCommand._controller.atSetpoint()
|
||||
|
||||
|
||||
def test_withTimeout(scheduler: commands2.CommandScheduler):
|
||||
with ManualSimTime() as sim:
|
||||
output_float = OOFloat(0.0)
|
||||
measurement_source = OOFloat(5.0)
|
||||
setpoint_source = OOFloat(2.0)
|
||||
pid_controller = wpimath.PIDController(0.1, 0.01, 0.001)
|
||||
system = commands2.Subsystem()
|
||||
command1 = commands2.PIDCommand(
|
||||
pid_controller,
|
||||
measurement_source,
|
||||
setpoint_source,
|
||||
output_float.set,
|
||||
system,
|
||||
)
|
||||
start_spying_on(command1)
|
||||
|
||||
timeout = command1.withTimeout(2)
|
||||
|
||||
scheduler.schedule(timeout)
|
||||
scheduler.run()
|
||||
|
||||
verify(command1).initialize()
|
||||
verify(command1).execute()
|
||||
assert not scheduler.isScheduled(command1)
|
||||
assert scheduler.isScheduled(timeout)
|
||||
|
||||
sim.step(3)
|
||||
scheduler.run()
|
||||
|
||||
verify(command1).end(True)
|
||||
verify(command1, never()).end(False)
|
||||
assert not scheduler.isScheduled(timeout)
|
||||
@@ -1,118 +0,0 @@
|
||||
from types import MethodType
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from wpimath import ProfiledPIDController, ProfiledPIDControllerRadians, TrapezoidProfile, TrapezoidProfileRadians
|
||||
|
||||
from commands2 import ProfiledPIDSubsystem
|
||||
|
||||
MAX_VELOCITY = 30 # Radians per second
|
||||
MAX_ACCELERATION = 500 # Radians per sec squared
|
||||
PID_KP = 50
|
||||
|
||||
|
||||
class EvalSubsystem(ProfiledPIDSubsystem):
|
||||
def __init__(self, controller, state_factory):
|
||||
self._state_factory = state_factory
|
||||
super().__init__(controller, 0)
|
||||
|
||||
|
||||
def simple_use_output(self, output: float, setpoint: Any):
|
||||
"""A simple useOutput method that saves the current state of the controller."""
|
||||
self._output = output
|
||||
self._setpoint = setpoint
|
||||
|
||||
|
||||
def simple_get_measurement(self) -> float:
|
||||
"""A simple getMeasurement method that returns zero (frozen or stuck plant)."""
|
||||
return 0.0
|
||||
|
||||
|
||||
controller_types = [
|
||||
(
|
||||
ProfiledPIDControllerRadians,
|
||||
TrapezoidProfileRadians.Constraints,
|
||||
TrapezoidProfileRadians.State,
|
||||
),
|
||||
(ProfiledPIDController, TrapezoidProfile.Constraints, TrapezoidProfile.State),
|
||||
]
|
||||
controller_ids = ["radians", "dimensionless"]
|
||||
|
||||
|
||||
@pytest.fixture(params=controller_types, ids=controller_ids)
|
||||
def subsystem(request):
|
||||
"""
|
||||
Fixture that returns an EvalSubsystem object for each type of controller.
|
||||
"""
|
||||
controller, profile_factory, state_factory = request.param
|
||||
profile = profile_factory(MAX_VELOCITY, MAX_ACCELERATION)
|
||||
pid = controller(PID_KP, 0, 0, profile)
|
||||
return EvalSubsystem(pid, state_factory)
|
||||
|
||||
|
||||
def test_profiled_pid_subsystem_init(subsystem):
|
||||
"""
|
||||
Verify that the ProfiledPIDSubsystem can be initialized using
|
||||
all supported profiled PID controller / trapezoid profile types.
|
||||
"""
|
||||
assert isinstance(subsystem, EvalSubsystem)
|
||||
|
||||
|
||||
def test_profiled_pid_subsystem_not_implemented_get_measurement(subsystem):
|
||||
"""
|
||||
Verify that the ProfiledPIDSubsystem.getMeasurement method
|
||||
raises NotImplementedError.
|
||||
"""
|
||||
with pytest.raises(NotImplementedError):
|
||||
subsystem.getMeasurement()
|
||||
|
||||
|
||||
def test_profiled_pid_subsystem_not_implemented_use_output(subsystem):
|
||||
"""
|
||||
Verify that the ProfiledPIDSubsystem.useOutput method raises
|
||||
NotImplementedError.
|
||||
"""
|
||||
with pytest.raises(NotImplementedError):
|
||||
subsystem.useOutput(0, subsystem._state_factory())
|
||||
|
||||
|
||||
@pytest.mark.parametrize("use_float", [True, False])
|
||||
def test_profiled_pid_subsystem_set_goal(subsystem, use_float):
|
||||
"""
|
||||
Verify that the ProfiledPIDSubsystem.setGoal method sets the goal.
|
||||
"""
|
||||
if use_float:
|
||||
subsystem.setGoal(1.0)
|
||||
assert subsystem.getController().getGoal().position == 1.0
|
||||
assert subsystem.getController().getGoal().velocity == 0.0
|
||||
else:
|
||||
subsystem.setGoal(subsystem._state_factory(1.0, 2.0))
|
||||
assert subsystem.getController().getGoal().position == 1.0
|
||||
assert subsystem.getController().getGoal().velocity == 2.0
|
||||
|
||||
|
||||
def test_profiled_pid_subsystem_enable_subsystem(subsystem):
|
||||
"""
|
||||
Verify the subsystem can be enabled.
|
||||
"""
|
||||
# Dynamically add useOutput and getMeasurement methods so the
|
||||
# system can be enabled
|
||||
setattr(subsystem, "useOutput", MethodType(simple_use_output, subsystem))
|
||||
setattr(subsystem, "getMeasurement", MethodType(simple_get_measurement, subsystem))
|
||||
# Enable the subsystem
|
||||
subsystem.enable()
|
||||
assert subsystem.isEnabled()
|
||||
|
||||
|
||||
def test_profiled_pid_subsystem_disable_subsystem(subsystem):
|
||||
"""
|
||||
Verify the subsystem can be disabled.
|
||||
"""
|
||||
# Dynamically add useOutput and getMeasurement methods so the
|
||||
# system can be enabled
|
||||
setattr(subsystem, "useOutput", MethodType(simple_use_output, subsystem))
|
||||
setattr(subsystem, "getMeasurement", MethodType(simple_get_measurement, subsystem))
|
||||
# Enable and then disable the subsystem
|
||||
subsystem.enable()
|
||||
subsystem.disable()
|
||||
assert not subsystem.isEnabled()
|
||||
@@ -1,137 +0,0 @@
|
||||
# Copyright (c) FIRST and other WPILib contributors.
|
||||
# Open Source Software; you can modify and/or share it under the terms of
|
||||
# the WPILib BSD license file in the root directory of this project.
|
||||
|
||||
from typing import TYPE_CHECKING, List, Tuple
|
||||
import math
|
||||
|
||||
from wpimath import TrapezoidProfile as DimensionlessProfile, TrapezoidProfileRadians as RadiansProfile
|
||||
|
||||
from wpilib import Timer
|
||||
|
||||
from util import * # type: ignore
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .util import *
|
||||
|
||||
import pytest
|
||||
|
||||
import commands2
|
||||
|
||||
|
||||
class TrapezoidProfileRadiansFixture:
|
||||
def __init__(self):
|
||||
constraints: RadiansProfile.Constraints = RadiansProfile.Constraints(
|
||||
3 * math.pi, math.pi
|
||||
)
|
||||
self._profile: RadiansProfile = RadiansProfile(constraints)
|
||||
self._goal_state = RadiansProfile.State(3, 0)
|
||||
|
||||
self._state = self._profile.calculate(
|
||||
0, self._goal_state, RadiansProfile.State(0, 0)
|
||||
)
|
||||
|
||||
self._timer = Timer()
|
||||
|
||||
def profileOutput(self, state: RadiansProfile.State) -> None:
|
||||
self._state = state
|
||||
|
||||
def currentState(self) -> RadiansProfile.State:
|
||||
return self._state
|
||||
|
||||
def getGoal(self) -> RadiansProfile.State:
|
||||
return self._goal_state
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def get_trapezoid_profile_radians() -> TrapezoidProfileRadiansFixture:
|
||||
return TrapezoidProfileRadiansFixture()
|
||||
|
||||
|
||||
class TrapezoidProfileFixture:
|
||||
def __init__(self):
|
||||
constraints: DimensionlessProfile.Constraints = (
|
||||
DimensionlessProfile.Constraints(3 * math.pi, math.pi)
|
||||
)
|
||||
self._profile: DimensionlessProfile = DimensionlessProfile(constraints)
|
||||
self._goal_state = DimensionlessProfile.State(3, 0)
|
||||
|
||||
self._state = self._profile.calculate(
|
||||
0, self._goal_state, DimensionlessProfile.State(0, 0)
|
||||
)
|
||||
|
||||
self._timer = Timer()
|
||||
|
||||
def profileOutput(self, state: DimensionlessProfile.State) -> None:
|
||||
self._state = state
|
||||
|
||||
def currentState(self) -> DimensionlessProfile.State:
|
||||
return self._state
|
||||
|
||||
def getGoal(self) -> DimensionlessProfile.State:
|
||||
return self._goal_state
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def get_trapezoid_profile_dimensionless() -> TrapezoidProfileFixture:
|
||||
return TrapezoidProfileFixture()
|
||||
|
||||
|
||||
def test_trapezoidProfileDimensionless(
|
||||
scheduler: commands2.CommandScheduler, get_trapezoid_profile_dimensionless
|
||||
):
|
||||
with ManualSimTime() as sim:
|
||||
subsystem = commands2.Subsystem()
|
||||
|
||||
fixture_data = get_trapezoid_profile_dimensionless
|
||||
|
||||
command = commands2.TrapezoidProfileCommand(
|
||||
fixture_data._profile,
|
||||
fixture_data.profileOutput,
|
||||
fixture_data.getGoal,
|
||||
fixture_data.currentState,
|
||||
subsystem,
|
||||
)
|
||||
|
||||
fixture_data._timer.restart()
|
||||
|
||||
command.initialize()
|
||||
|
||||
count = 0
|
||||
while not command.isFinished():
|
||||
command.execute()
|
||||
count += 1
|
||||
sim.step(0.005)
|
||||
|
||||
fixture_data._timer.stop()
|
||||
command.end(True)
|
||||
|
||||
|
||||
def test_trapezoidProfileRadians(
|
||||
scheduler: commands2.CommandScheduler, get_trapezoid_profile_radians
|
||||
):
|
||||
with ManualSimTime() as sim:
|
||||
subsystem = commands2.Subsystem()
|
||||
|
||||
fixture_data = get_trapezoid_profile_radians
|
||||
|
||||
command = commands2.TrapezoidProfileCommand(
|
||||
fixture_data._profile,
|
||||
fixture_data.profileOutput,
|
||||
fixture_data.getGoal,
|
||||
fixture_data.currentState,
|
||||
subsystem,
|
||||
)
|
||||
|
||||
fixture_data._timer.restart()
|
||||
|
||||
command.initialize()
|
||||
|
||||
count = 0
|
||||
while not command.isFinished():
|
||||
command.execute()
|
||||
count += 1
|
||||
sim.step(0.005)
|
||||
|
||||
fixture_data._timer.stop()
|
||||
command.end(True)
|
||||
Reference in New Issue
Block a user