[robotpy] Mirror robotpy's commands-v2 (#8369)

Project import generated by Copybara.

GitOrigin-RevId: 715c8e8372d936f447f2937aab6b1a22dc619126
This commit is contained in:
PJ Reiniger
2025-11-14 00:55:54 -05:00
committed by GitHub
parent 6e6f8dd7cc
commit 1a99a348cb
84 changed files with 8974 additions and 0 deletions

View File

@@ -0,0 +1,166 @@
from typing import Generic, TypeVar
import commands2
import pytest
# T = TypeVar("T", bound=commands2.Command)
# T = commands2.Command
from util import *
class SingleCompositionTestBase:
def composeSingle(self, member: commands2.Command):
raise NotImplementedError
@pytest.mark.parametrize(
"interruptionBehavior",
[
commands2.InterruptionBehavior.kCancelSelf,
commands2.InterruptionBehavior.kCancelIncoming,
],
)
def test_interruptible(self, interruptionBehavior: commands2.InterruptionBehavior):
command = self.composeSingle(
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
interruptionBehavior
)
)
assert command.getInterruptionBehavior() == interruptionBehavior
@pytest.mark.parametrize("runsWhenDisabled", [True, False])
def test_runWhenDisabled(self, runsWhenDisabled: bool):
command = self.composeSingle(
commands2.WaitUntilCommand(lambda: False).ignoringDisable(runsWhenDisabled)
)
assert command.runsWhenDisabled() == runsWhenDisabled
def test_command_in_other_composition(self):
command = commands2.InstantCommand()
wrapped = commands2.WrapperCommand(command)
with pytest.raises(commands2.IllegalCommandUse):
self.composeSingle(command)
def test_command_in_multiple_compositions(self):
command = commands2.InstantCommand()
self.composeSingle(command)
with pytest.raises(commands2.IllegalCommandUse):
self.composeSingle(command)
def test_compose_then_schedule(self, scheduler: commands2.CommandScheduler):
command = commands2.InstantCommand()
self.composeSingle(command)
with pytest.raises(commands2.IllegalCommandUse):
scheduler.schedule(command)
def test_schedule_then_compose(self, scheduler: commands2.CommandScheduler):
command = commands2.RunCommand(lambda: None)
scheduler.schedule(command)
with pytest.raises(commands2.IllegalCommandUse):
self.composeSingle(command)
class MultiCompositionTestBase(SingleCompositionTestBase):
def compose(self, *members: commands2.Command):
raise NotImplementedError
def composeSingle(self, member: commands2.Command):
return self.compose(member)
@pytest.mark.parametrize(
"expected,command1,command2,command3",
[
pytest.param(
commands2.InterruptionBehavior.kCancelSelf,
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelSelf
),
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelSelf
),
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelSelf
),
id="AllCancelSelf",
),
pytest.param(
commands2.InterruptionBehavior.kCancelIncoming,
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelIncoming
),
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelIncoming
),
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelIncoming
),
id="AllCancelIncoming",
),
pytest.param(
commands2.InterruptionBehavior.kCancelSelf,
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelSelf
),
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelSelf
),
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelIncoming
),
id="TwoCancelSelfOneIncoming",
),
pytest.param(
commands2.InterruptionBehavior.kCancelSelf,
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelIncoming
),
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelIncoming
),
commands2.WaitUntilCommand(lambda: False).withInterruptBehavior(
commands2.InterruptionBehavior.kCancelSelf
),
id="TwoCancelIncomingOneSelf",
),
],
)
def test_interruptible(self, expected, command1, command2, command3):
command = self.compose(command1, command2, command3)
assert command.getInterruptionBehavior() == expected
@pytest.mark.parametrize(
"expected,command1,command2,command3",
[
pytest.param(
False,
commands2.WaitUntilCommand(lambda: False).ignoringDisable(False),
commands2.WaitUntilCommand(lambda: False).ignoringDisable(False),
commands2.WaitUntilCommand(lambda: False).ignoringDisable(False),
id="AllFalse",
),
pytest.param(
True,
commands2.WaitUntilCommand(lambda: False).ignoringDisable(True),
commands2.WaitUntilCommand(lambda: False).ignoringDisable(True),
commands2.WaitUntilCommand(lambda: False).ignoringDisable(True),
id="AllTrue",
),
pytest.param(
False,
commands2.WaitUntilCommand(lambda: False).ignoringDisable(True),
commands2.WaitUntilCommand(lambda: False).ignoringDisable(True),
commands2.WaitUntilCommand(lambda: False).ignoringDisable(False),
id="TwoTrueOneFalse",
),
pytest.param(
False,
commands2.WaitUntilCommand(lambda: False).ignoringDisable(False),
commands2.WaitUntilCommand(lambda: False).ignoringDisable(False),
commands2.WaitUntilCommand(lambda: False).ignoringDisable(True),
id="TwoFalseOneTrue",
),
],
)
def test_runWhenDisabled(self, expected, command1, command2, command3):
command = self.compose(command1, command2, command3)
assert command.runsWhenDisabled() == expected

View File

@@ -0,0 +1,20 @@
import commands2
import pytest
from ntcore import NetworkTableInstance
from wpilib.simulation import DriverStationSim
@pytest.fixture(autouse=True)
def scheduler():
commands2.CommandScheduler.resetInstance()
DriverStationSim.setEnabled(True)
DriverStationSim.notifyNewData()
return commands2.CommandScheduler.getInstance()
@pytest.fixture()
def nt_instance():
inst = NetworkTableInstance.create()
inst.startLocal()
yield inst
inst.stopLocal()

View File

@@ -0,0 +1,222 @@
from typing import TYPE_CHECKING
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import commands2
import pytest
def test_timeout(scheduler: commands2.CommandScheduler):
with ManualSimTime() as sim:
command1 = commands2.WaitCommand(1)
timeout = command1.withTimeout(2)
scheduler.schedule(timeout)
scheduler.run()
assert not command1.isScheduled()
assert timeout.isScheduled()
sim.step(3)
scheduler.run()
assert not timeout.isScheduled()
def test_until(scheduler: commands2.CommandScheduler):
condition = OOBoolean(False)
command = commands2.WaitCommand(10).until(condition)
scheduler.schedule(command)
scheduler.run()
assert command.isScheduled()
condition.set(True)
scheduler.run()
assert not command.isScheduled()
def test_only_while(scheduler: commands2.CommandScheduler):
condition = OOBoolean(True)
command = commands2.WaitCommand(10).onlyWhile(condition)
scheduler.schedule(command)
scheduler.run()
assert command.isScheduled()
condition.set(False)
scheduler.run()
assert not command.isScheduled()
def test_ignoringDisable(scheduler: commands2.CommandScheduler):
command = commands2.RunCommand(lambda: None).ignoringDisable(True)
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
scheduler.schedule(command)
scheduler.run()
assert command.isScheduled()
def test_beforeStarting(scheduler: commands2.CommandScheduler):
condition = OOBoolean(False)
condition.set(False)
command = commands2.InstantCommand()
scheduler.schedule(
command.beforeStarting(commands2.InstantCommand(lambda: condition.set(True)))
)
assert condition == True
@pytest.mark.skip
def test_andThenLambda(scheduler: commands2.CommandScheduler): ...
def test_andThen(scheduler: commands2.CommandScheduler):
condition = OOBoolean(False)
condition.set(False)
command1 = commands2.InstantCommand()
command2 = commands2.InstantCommand(lambda: condition.set(True))
scheduler.schedule(command1.andThen(command2))
assert condition == False
scheduler.run()
assert condition == True
def test_deadlineWith(scheduler: commands2.CommandScheduler):
condition = OOBoolean(False)
condition.set(False)
dictator = commands2.WaitUntilCommand(condition)
endsBefore = commands2.InstantCommand()
endsAfter = commands2.WaitUntilCommand(lambda: False)
group = dictator.deadlineWith(endsBefore, endsAfter)
scheduler.schedule(group)
scheduler.run()
assert group.isScheduled()
condition.set(True)
scheduler.run()
assert not group.isScheduled()
def test_deadlineFor(scheduler: commands2.CommandScheduler):
condition = OOBoolean(False)
condition.set(False)
dictator = commands2.WaitUntilCommand(condition)
endsBefore = commands2.InstantCommand()
endsAfter = commands2.WaitUntilCommand(lambda: False)
group = dictator.deadlineFor(endsBefore, endsAfter)
scheduler.schedule(group)
scheduler.run()
assert group.isScheduled()
condition.set(True)
scheduler.run()
assert not group.isScheduled()
def test_alongWith(scheduler: commands2.CommandScheduler):
condition = OOBoolean()
condition.set(False)
command1 = commands2.WaitUntilCommand(condition)
command2 = commands2.InstantCommand()
group = command1.alongWith(command2)
scheduler.schedule(group)
scheduler.run()
assert group.isScheduled()
condition.set(True)
scheduler.run()
assert not group.isScheduled()
def test_raceWith(scheduler: commands2.CommandScheduler):
command1 = commands2.WaitUntilCommand(lambda: False)
command2 = commands2.InstantCommand()
group = command1.raceWith(command2)
scheduler.schedule(group)
scheduler.run()
assert not group.isScheduled()
def test_unless(scheduler: commands2.CommandScheduler):
unlessCondition = OOBoolean(True)
hasRunCondition = OOBoolean(False)
command = commands2.InstantCommand(lambda: hasRunCondition.set(True)).unless(
unlessCondition
)
scheduler.schedule(command)
scheduler.run()
assert hasRunCondition == False
unlessCondition.set(False)
scheduler.schedule(command)
scheduler.run()
assert hasRunCondition == True
def test_onlyIf(scheduler: commands2.CommandScheduler):
onlyIfCondition = OOBoolean(False)
hasRunCondition = OOBoolean(False)
command = commands2.InstantCommand(lambda: hasRunCondition.set(True)).onlyIf(
onlyIfCondition
)
scheduler.schedule(command)
scheduler.run()
assert hasRunCondition == False
onlyIfCondition.set(True)
scheduler.schedule(command)
scheduler.run()
assert hasRunCondition == True
def test_finallyDo(scheduler: commands2.CommandScheduler):
first = OOInteger(0)
second = OOInteger(0)
command = commands2.FunctionalCommand(
lambda: None,
lambda: None,
lambda interrupted: first.incrementAndGet() if not interrupted else None,
lambda: True,
).finallyDo(lambda interrupted: second.addAndGet(1 + first()))
scheduler.schedule(command)
assert first == 0
assert second == 0
scheduler.run()
assert first == 1
assert second == 2
def test_handleInterrupt(scheduler: commands2.CommandScheduler):
first = OOInteger(0)
second = OOInteger(0)
command = commands2.FunctionalCommand(
lambda: None,
lambda: None,
lambda interrupted: first.incrementAndGet() if interrupted else None,
lambda: False,
).handleInterrupt(lambda: second.addAndGet(1 + first()))
scheduler.schedule(command)
scheduler.run()
assert first == 0
assert second == 0
scheduler.cancel(command)
assert first == 1
assert second == 2
def test_withName(scheduler: commands2.CommandScheduler):
command = commands2.InstantCommand()
name = "Named"
named = command.withName(name)
assert named.getName() == name

View File

@@ -0,0 +1,58 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_requirementInterrupt(scheduler: commands2.CommandScheduler):
requirement = commands2.Subsystem()
interrupted = commands2.Command()
interrupted.addRequirements(requirement)
interrupter = commands2.Command()
interrupter.addRequirements(requirement)
start_spying_on(interrupted)
start_spying_on(interrupter)
scheduler.schedule(interrupted)
scheduler.run()
scheduler.schedule(interrupter)
scheduler.run()
verify(interrupted).initialize()
verify(interrupted).execute()
verify(interrupted).end(True)
verify(interrupter).initialize()
verify(interrupter).execute()
assert not interrupted.isScheduled()
assert interrupter.isScheduled()
def test_requirementUninterruptible(scheduler: commands2.CommandScheduler):
requirement = commands2.Subsystem()
notInterrupted = commands2.RunCommand(
lambda: None, requirement
).withInterruptBehavior(commands2.InterruptionBehavior.kCancelIncoming)
interrupter = commands2.Command()
interrupter.addRequirements(requirement)
start_spying_on(notInterrupted)
scheduler.schedule(notInterrupted)
scheduler.schedule(interrupter)
assert scheduler.isScheduled(notInterrupted)
assert not scheduler.isScheduled(interrupter)
def test_defaultCommandRequirementError(scheduler: commands2.CommandScheduler):
system = commands2.Subsystem()
missingRequirement = commands2.WaitUntilCommand(lambda: False)
with pytest.raises(commands2.IllegalCommandUse):
scheduler.setDefaultCommand(system, missingRequirement)

View File

@@ -0,0 +1,90 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_instantSchedule(scheduler: commands2.CommandScheduler):
command = commands2.Command()
command.isFinished = lambda: True
start_spying_on(command)
scheduler.schedule(command)
assert scheduler.isScheduled(command)
verify(command).initialize()
scheduler.run()
verify(command).execute()
verify(command).end(False)
assert not scheduler.isScheduled(command)
def test_singleIterationSchedule(scheduler: commands2.CommandScheduler):
command = commands2.Command()
start_spying_on(command)
scheduler.schedule(command)
assert scheduler.isScheduled(command)
scheduler.run()
command.isFinished = lambda: True
scheduler.run()
verify(command).initialize()
verify(command, times(2)).execute()
verify(command).end(False)
assert not scheduler.isScheduled(command)
def test_multiSchedule(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
command3 = commands2.Command()
scheduler.schedule(command1, command2, command3)
assert scheduler.isScheduled(command1, command2, command3)
scheduler.run()
assert scheduler.isScheduled(command1, command2, command3)
command1.isFinished = lambda: True
scheduler.run()
assert scheduler.isScheduled(command2, command3)
assert not scheduler.isScheduled(command1)
command2.isFinished = lambda: True
scheduler.run()
assert scheduler.isScheduled(command3)
assert not scheduler.isScheduled(command1, command2)
command3.isFinished = lambda: True
scheduler.run()
assert not scheduler.isScheduled(command1, command2, command3)
def test_schedulerCancel(scheduler: commands2.CommandScheduler):
command = commands2.Command()
start_spying_on(command)
scheduler.schedule(command)
scheduler.run()
scheduler.cancel(command)
scheduler.run()
verify(command).execute()
verify(command).end(True)
verify(command, never()).end(False)
assert not scheduler.isScheduled(command)
def test_notScheduledCancel(scheduler: commands2.CommandScheduler):
command = commands2.Command()
scheduler.cancel(command)

View File

@@ -0,0 +1,37 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_commandInMultipleGroups():
command1 = commands2.Command()
command2 = commands2.Command()
commands2.ParallelCommandGroup(command1, command2)
with pytest.raises(commands2.IllegalCommandUse):
commands2.ParallelCommandGroup(command1, command2)
def test_commandInGroupExternallyScheduled(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
commands2.ParallelCommandGroup(command1, command2)
with pytest.raises(commands2.IllegalCommandUse):
scheduler.schedule(command1)
def test_redecoratedCommandError(scheduler: commands2.CommandScheduler):
command = commands2.InstantCommand()
command.withTimeout(10).until(lambda: False)
with pytest.raises(commands2.IllegalCommandUse):
command.withTimeout(10)
scheduler.removeComposedCommand(command)
command.withTimeout(10)

View File

@@ -0,0 +1,55 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_conditionalCommand(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command1.isFinished = lambda: True
command2 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
conditionalCommand = commands2.ConditionalCommand(command1, command2, lambda: True)
scheduler.schedule(conditionalCommand)
scheduler.run()
verify(command1).initialize()
verify(command1).execute()
verify(command1).end(False)
verify(command2, never()).initialize()
verify(command2, never()).execute()
verify(command2, never()).end(False)
def test_conditionalCommandRequirement(scheduler: commands2.CommandScheduler):
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1, system2)
command2 = commands2.Command()
command2.addRequirements(system3)
start_spying_on(command1)
start_spying_on(command2)
conditionalCommand = commands2.ConditionalCommand(command1, command2, lambda: True)
scheduler.schedule(conditionalCommand)
scheduler.schedule(commands2.InstantCommand(lambda: None, system3))
assert not scheduler.isScheduled(conditionalCommand)
assert command1.end.called_with(True)
assert not command2.end.called_with(True)

View File

@@ -0,0 +1,73 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_defaultCommandSchedule(scheduler: commands2.CommandScheduler):
hasDefaultCommand = commands2.Subsystem()
defaultCommand = commands2.Command()
defaultCommand.addRequirements(hasDefaultCommand)
scheduler.setDefaultCommand(hasDefaultCommand, defaultCommand)
scheduler.run()
assert scheduler.isScheduled(defaultCommand)
def test_defaultCommandInterruptResume(scheduler: commands2.CommandScheduler):
hasDefaultCommand = commands2.Subsystem()
defaultCommand = commands2.Command()
defaultCommand.addRequirements(hasDefaultCommand)
interrupter = commands2.Command()
interrupter.addRequirements(hasDefaultCommand)
scheduler.setDefaultCommand(hasDefaultCommand, defaultCommand)
scheduler.run()
scheduler.schedule(interrupter)
assert not scheduler.isScheduled(defaultCommand)
assert scheduler.isScheduled(interrupter)
scheduler.cancel(interrupter)
scheduler.run()
assert scheduler.isScheduled(defaultCommand)
assert not scheduler.isScheduled(interrupter)
def test_defaultCommandDisableResume(scheduler: commands2.CommandScheduler):
hasDefaultCommand = commands2.Subsystem()
defaultCommand = commands2.Command()
defaultCommand.addRequirements(hasDefaultCommand)
defaultCommand.runsWhenDisabled = lambda: False
start_spying_on(defaultCommand)
scheduler.setDefaultCommand(hasDefaultCommand, defaultCommand)
scheduler.run()
assert scheduler.isScheduled(defaultCommand)
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
scheduler.run()
assert not scheduler.isScheduled(defaultCommand)
DriverStationSim.setEnabled(True)
DriverStationSim.notifyNewData()
scheduler.run()
assert scheduler.isScheduled(defaultCommand)
assert defaultCommand.end.called_with(True)

View File

@@ -0,0 +1,63 @@
import commands2
from util import * # type: ignore
def test_deferred_functions(scheduler: commands2.CommandScheduler):
inner_command = commands2.Command()
command = commands2.DeferredCommand(lambda: inner_command)
start_spying_on(inner_command)
start_spying_on(command)
command.initialize()
verify(inner_command).initialize()
command.execute()
verify(inner_command).execute()
assert not command.isFinished()
verify(inner_command).isFinished()
inner_command.isFinished = lambda: True
assert command.isFinished()
verify(inner_command, times=times(2)).isFinished()
command.end(False)
verify(inner_command).end(False)
def test_deferred_supplier_only_called_during_init(
scheduler: commands2.CommandScheduler,
):
supplier_called = 0
def supplier() -> commands2.Command:
nonlocal supplier_called
supplier_called += 1
return commands2.Command()
command = commands2.DeferredCommand(supplier)
assert supplier_called == 0
scheduler.schedule(command)
assert supplier_called == 1
scheduler.run()
scheduler.schedule(command)
assert supplier_called == 1
def test_deferred_requirements(scheduler: commands2.CommandScheduler):
subsystem = commands2.Subsystem()
command = commands2.DeferredCommand(lambda: commands2.Command(), subsystem)
assert subsystem in command.getRequirements()
def test_deferred_null_command(scheduler: commands2.CommandScheduler):
command = commands2.DeferredCommand(lambda: None) # type: ignore
command.initialize()
command.execute()
command.isFinished()
command.end(False)

View File

@@ -0,0 +1,36 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_functionalCommandSchedule(scheduler: commands2.CommandScheduler):
cond1 = OOBoolean()
cond2 = OOBoolean()
cond3 = OOBoolean()
cond4 = OOBoolean()
command = commands2.FunctionalCommand(
lambda: cond1.set(True),
lambda: cond2.set(True),
lambda _: cond3.set(True),
lambda: cond4.get(),
)
scheduler.schedule(command)
scheduler.run()
assert scheduler.isScheduled(command)
cond4.set(True)
scheduler.run()
assert not scheduler.isScheduled(command)
assert cond1
assert cond2
assert cond3

View File

@@ -0,0 +1,21 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_instantCommandSchedule(scheduler: commands2.CommandScheduler):
cond = OOBoolean()
command = commands2.InstantCommand(lambda: cond.set(True))
scheduler.schedule(command)
scheduler.run()
assert cond
assert not scheduler.isScheduled(command)

View File

@@ -0,0 +1,29 @@
from typing import TYPE_CHECKING
import commands2
from ntcore import NetworkTableInstance
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
def test_networkbutton(
scheduler: commands2.CommandScheduler, nt_instance: NetworkTableInstance
):
# command = commands2.Command()
command = commands2.Command()
start_spying_on(command)
pub = nt_instance.getTable("TestTable").getBooleanTopic("Test").publish()
button = commands2.button.NetworkButton(nt_instance, "TestTable", "Test")
pub.set(False)
button.onTrue(command)
scheduler.run()
assert command.schedule.times_called == 0
pub.set(True)
scheduler.run()
scheduler.run()
verify(command).schedule()

View File

@@ -0,0 +1,22 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_notifierCommandScheduler(scheduler: commands2.CommandScheduler):
with ManualSimTime() as sim:
counter = OOInteger(0)
command = commands2.NotifierCommand(counter.incrementAndGet, 0.01)
scheduler.schedule(command)
for i in range(5):
sim.step(0.005)
scheduler.cancel(command)
assert counter == 2

View File

@@ -0,0 +1,117 @@
from typing import TYPE_CHECKING
import commands2
from compositiontestbase import MultiCompositionTestBase # type: ignore
from util import * # type: ignore
# from tests.compositiontestbase import T
if TYPE_CHECKING:
from .util import *
from .compositiontestbase import MultiCompositionTestBase
import pytest
class TestParallelCommandGroupComposition(MultiCompositionTestBase):
def compose(self, *members: commands2.Command):
return commands2.ParallelCommandGroup(*members)
def test_parallelGroupSchedule(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
group = commands2.ParallelCommandGroup(command1, command2)
scheduler.schedule(group)
verify(command1).initialize()
verify(command2).initialize()
command1.isFinished = lambda: True
scheduler.run()
command2.isFinished = lambda: True
scheduler.run()
verify(command1).execute()
verify(command1).end(False)
verify(command2, times(2)).execute()
verify(command2).end(False)
assert not scheduler.isScheduled(group)
def test_parallelGroupInterrupt(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
group = commands2.ParallelCommandGroup(command1, command2)
scheduler.schedule(group)
command1.isFinished = lambda: True
scheduler.run()
scheduler.run()
scheduler.cancel(group)
verify(command1).execute()
verify(command1).end(False)
verify(command1, never()).end(True)
verify(command2, times(2)).execute()
verify(command2, never()).end(False)
verify(command2).end(True)
assert not scheduler.isScheduled(group)
def test_notScheduledCancel(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
group = commands2.ParallelCommandGroup(command1, command2)
scheduler.cancel(group)
def test_parallelGroupRequirement(scheduler: commands2.CommandScheduler):
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
system4 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1, system2)
command2 = commands2.Command()
command2.addRequirements(system3)
command3 = commands2.Command()
command3.addRequirements(system3, system4)
group = commands2.ParallelCommandGroup(command1, command2)
scheduler.schedule(group)
scheduler.schedule(command3)
assert not scheduler.isScheduled(group)
assert scheduler.isScheduled(command3)
def test_parallelGroupRequirementError():
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1, system2)
command2 = commands2.Command()
command2.addRequirements(system2, system3)
with pytest.raises(commands2.IllegalCommandUse):
commands2.ParallelCommandGroup(command1, command2)

View File

@@ -0,0 +1,119 @@
from typing import TYPE_CHECKING
import commands2
from compositiontestbase import MultiCompositionTestBase # type: ignore
from util import * # type: ignore
# from tests.compositiontestbase import T
if TYPE_CHECKING:
from .util import *
from .compositiontestbase import MultiCompositionTestBase
import pytest
class TestParallelDeadlineGroupComposition(MultiCompositionTestBase):
def compose(self, *members: commands2.Command):
return commands2.ParallelDeadlineGroup(members[0], *members[1:])
def test_parallelDeadlineSchedule(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
command2.isFinished = lambda: True
command3 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
start_spying_on(command3)
group = commands2.ParallelDeadlineGroup(command1, command2, command3)
scheduler.schedule(group)
scheduler.run()
assert scheduler.isScheduled(group)
command1.isFinished = lambda: True
scheduler.run()
assert not scheduler.isScheduled(group)
verify(command2).initialize()
verify(command2).execute()
verify(command2).end(False)
verify(command2, never()).end(True)
verify(command1).initialize()
verify(command1, times(2)).execute()
verify(command1).end(False)
verify(command1, never()).end(True)
verify(command3).initialize()
verify(command3, times(2)).execute()
verify(command3, never()).end(False)
verify(command3).end(True)
def test_parallelDeadlineInterrupt(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
command2.isFinished = lambda: True
start_spying_on(command1)
start_spying_on(command2)
group = commands2.ParallelDeadlineGroup(command1, command2)
scheduler.schedule(group)
scheduler.run()
scheduler.run()
scheduler.cancel(group)
verify(command1, times(2)).execute()
verify(command1, never()).end(False)
verify(command1).end(True)
verify(command2).execute()
verify(command2).end(False)
verify(command2, never()).end(True)
assert not scheduler.isScheduled(group)
def test_parallelDeadlineRequirement(scheduler: commands2.CommandScheduler):
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
system4 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1, system2)
command2 = commands2.Command()
command2.addRequirements(system3)
command3 = commands2.Command()
command3.addRequirements(system3, system4)
group = commands2.ParallelDeadlineGroup(command1, command2)
scheduler.schedule(group)
scheduler.schedule(command3)
assert not scheduler.isScheduled(group)
assert scheduler.isScheduled(command3)
def test_parallelDeadlineRequirementError(scheduler: commands2.CommandScheduler):
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1, system2)
command2 = commands2.Command()
command2.addRequirements(system2, system3)
with pytest.raises(commands2.IllegalCommandUse):
commands2.ParallelDeadlineGroup(command1, command2)

View File

@@ -0,0 +1,183 @@
from typing import TYPE_CHECKING
import commands2
from compositiontestbase import MultiCompositionTestBase # type: ignore
from util import * # type: ignore
# from tests.compositiontestbase import T
if TYPE_CHECKING:
from .util import *
from .compositiontestbase import MultiCompositionTestBase
import pytest
class TestParallelRaceGroupComposition(MultiCompositionTestBase):
def compose(self, *members: commands2.Command):
return commands2.ParallelRaceGroup(*members)
def test_parallelRaceSchedule(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
group = commands2.ParallelRaceGroup(command1, command2)
scheduler.schedule(group)
verify(command1).initialize()
verify(command2).initialize()
command1.isFinished = lambda: True
scheduler.run()
command2.isFinished = lambda: True
scheduler.run()
verify(command1).execute()
verify(command1).end(False)
verify(command2).execute()
verify(command2).end(True)
verify(command2, never()).end(False)
assert not scheduler.isScheduled(group)
def test_parallelRaceInterrupt(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
group = commands2.ParallelRaceGroup(command1, command2)
scheduler.schedule(group)
scheduler.run()
scheduler.run()
scheduler.cancel(group)
verify(command1, times(2)).execute()
verify(command1, never()).end(False)
verify(command1).end(True)
verify(command2, times(2)).execute()
verify(command2, never()).end(False)
verify(command2).end(True)
assert not scheduler.isScheduled(group)
def test_notScheduledCancel(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
group = commands2.ParallelRaceGroup(command1, command2)
scheduler.cancel(group)
def test_parallelRaceRequirement(scheduler: commands2.CommandScheduler):
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
system4 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1, system2)
command2 = commands2.Command()
command2.addRequirements(system3)
command3 = commands2.Command()
command3.addRequirements(system3, system4)
group = commands2.ParallelRaceGroup(command1, command2)
scheduler.schedule(group)
scheduler.schedule(command3)
assert not scheduler.isScheduled(group)
assert scheduler.isScheduled(command3)
def test_parallelRaceRequirementError():
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1, system2)
command2 = commands2.Command()
command2.addRequirements(system2, system3)
with pytest.raises(commands2.IllegalCommandUse):
commands2.ParallelRaceGroup(command1, command2)
def test_parallelRaceOnlyCallsEndOnce(scheduler: commands2.CommandScheduler):
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1)
command2 = commands2.Command()
command2.addRequirements(system2)
command3 = commands2.Command()
group1 = commands2.SequentialCommandGroup(command1, command2)
group2 = commands2.ParallelRaceGroup(group1, command3)
scheduler.schedule(group2)
scheduler.run()
command1.isFinished = lambda: True
scheduler.run()
command2.isFinished = lambda: True
scheduler.run()
assert not scheduler.isScheduled(group2)
def test_parallelRaceScheduleTwiceTest(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
group = commands2.ParallelRaceGroup(command1, command2)
scheduler.schedule(group)
verify(command1).initialize()
verify(command2).initialize()
command1.isFinished = lambda: True
scheduler.run()
command2.isFinished = lambda: True
scheduler.run()
verify(command1).execute()
verify(command1).end(False)
verify(command2).execute()
verify(command2).end(True)
verify(command2, never()).end(False)
assert not scheduler.isScheduled(group)
reset(command1)
reset(command2)
scheduler.schedule(group)
verify(command1).initialize()
verify(command2).initialize()
scheduler.run()
scheduler.run()
assert scheduler.isScheduled(group)
command2.isFinished = lambda: True
scheduler.run()
assert not scheduler.isScheduled(group)

View File

@@ -0,0 +1,114 @@
from typing import TYPE_CHECKING
from util import * # type: ignore
import wpimath.controller as controller
import commands2
if TYPE_CHECKING:
from .util import *
import pytest
def test_pidCommandSupplier(scheduler: commands2.CommandScheduler):
with ManualSimTime() as sim:
output_float = OOFloat(0.0)
measurement_source = OOFloat(5.0)
setpoint_source = OOFloat(2.0)
pid_controller = controller.PIDController(0.1, 0.01, 0.001)
system = commands2.Subsystem()
pidCommand = commands2.PIDCommand(
pid_controller,
measurement_source,
setpoint_source,
output_float.set,
system,
)
start_spying_on(pidCommand)
scheduler.schedule(pidCommand)
scheduler.run()
sim.step(1)
scheduler.run()
assert scheduler.isScheduled(pidCommand)
assert not pidCommand._controller.atSetpoint()
# Tell the pid command we're at our setpoint through the controller
measurement_source.set(setpoint_source())
sim.step(2)
scheduler.run()
# Should be measuring error of 0 now
assert pidCommand._controller.atSetpoint()
def test_pidCommandScalar(scheduler: commands2.CommandScheduler):
with ManualSimTime() as sim:
output_float = OOFloat(0.0)
measurement_source = OOFloat(5.0)
setpoint_source = 2.0
pid_controller = controller.PIDController(0.1, 0.01, 0.001)
system = commands2.Subsystem()
pidCommand = commands2.PIDCommand(
pid_controller,
measurement_source,
setpoint_source,
output_float.set,
system,
)
start_spying_on(pidCommand)
scheduler.schedule(pidCommand)
scheduler.run()
sim.step(1)
scheduler.run()
assert scheduler.isScheduled(pidCommand)
assert not pidCommand._controller.atSetpoint()
# Tell the pid command we're at our setpoint through the controller
measurement_source.set(setpoint_source)
sim.step(2)
scheduler.run()
# Should be measuring error of 0 now
assert pidCommand._controller.atSetpoint()
def test_withTimeout(scheduler: commands2.CommandScheduler):
with ManualSimTime() as sim:
output_float = OOFloat(0.0)
measurement_source = OOFloat(5.0)
setpoint_source = OOFloat(2.0)
pid_controller = controller.PIDController(0.1, 0.01, 0.001)
system = commands2.Subsystem()
command1 = commands2.PIDCommand(
pid_controller,
measurement_source,
setpoint_source,
output_float.set,
system,
)
start_spying_on(command1)
timeout = command1.withTimeout(2)
scheduler.schedule(timeout)
scheduler.run()
verify(command1).initialize()
verify(command1).execute()
assert not scheduler.isScheduled(command1)
assert scheduler.isScheduled(timeout)
sim.step(3)
scheduler.run()
verify(command1).end(True)
verify(command1, never()).end(False)
assert not scheduler.isScheduled(timeout)

View File

@@ -0,0 +1,17 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_printCommandSchedule(capsys, scheduler: commands2.CommandScheduler):
command = commands2.PrintCommand("Test!")
scheduler.schedule(command)
scheduler.run()
assert not scheduler.isScheduled(command)
assert capsys.readouterr().out == "Test!\n"

View File

@@ -0,0 +1,119 @@
from types import MethodType
from typing import Any
import pytest
from wpimath.controller import ProfiledPIDController, ProfiledPIDControllerRadians
from wpimath.trajectory import TrapezoidProfile, TrapezoidProfileRadians
from commands2 import ProfiledPIDSubsystem
MAX_VELOCITY = 30 # Radians per second
MAX_ACCELERATION = 500 # Radians per sec squared
PID_KP = 50
class EvalSubsystem(ProfiledPIDSubsystem):
def __init__(self, controller, state_factory):
self._state_factory = state_factory
super().__init__(controller, 0)
def simple_use_output(self, output: float, setpoint: Any):
"""A simple useOutput method that saves the current state of the controller."""
self._output = output
self._setpoint = setpoint
def simple_get_measurement(self) -> float:
"""A simple getMeasurement method that returns zero (frozen or stuck plant)."""
return 0.0
controller_types = [
(
ProfiledPIDControllerRadians,
TrapezoidProfileRadians.Constraints,
TrapezoidProfileRadians.State,
),
(ProfiledPIDController, TrapezoidProfile.Constraints, TrapezoidProfile.State),
]
controller_ids = ["radians", "dimensionless"]
@pytest.fixture(params=controller_types, ids=controller_ids)
def subsystem(request):
"""
Fixture that returns an EvalSubsystem object for each type of controller.
"""
controller, profile_factory, state_factory = request.param
profile = profile_factory(MAX_VELOCITY, MAX_ACCELERATION)
pid = controller(PID_KP, 0, 0, profile)
return EvalSubsystem(pid, state_factory)
def test_profiled_pid_subsystem_init(subsystem):
"""
Verify that the ProfiledPIDSubsystem can be initialized using
all supported profiled PID controller / trapezoid profile types.
"""
assert isinstance(subsystem, EvalSubsystem)
def test_profiled_pid_subsystem_not_implemented_get_measurement(subsystem):
"""
Verify that the ProfiledPIDSubsystem.getMeasurement method
raises NotImplementedError.
"""
with pytest.raises(NotImplementedError):
subsystem.getMeasurement()
def test_profiled_pid_subsystem_not_implemented_use_output(subsystem):
"""
Verify that the ProfiledPIDSubsystem.useOutput method raises
NotImplementedError.
"""
with pytest.raises(NotImplementedError):
subsystem.useOutput(0, subsystem._state_factory())
@pytest.mark.parametrize("use_float", [True, False])
def test_profiled_pid_subsystem_set_goal(subsystem, use_float):
"""
Verify that the ProfiledPIDSubsystem.setGoal method sets the goal.
"""
if use_float:
subsystem.setGoal(1.0)
assert subsystem.getController().getGoal().position == 1.0
assert subsystem.getController().getGoal().velocity == 0.0
else:
subsystem.setGoal(subsystem._state_factory(1.0, 2.0))
assert subsystem.getController().getGoal().position == 1.0
assert subsystem.getController().getGoal().velocity == 2.0
def test_profiled_pid_subsystem_enable_subsystem(subsystem):
"""
Verify the subsystem can be enabled.
"""
# Dynamically add useOutput and getMeasurement methods so the
# system can be enabled
setattr(subsystem, "useOutput", MethodType(simple_use_output, subsystem))
setattr(subsystem, "getMeasurement", MethodType(simple_get_measurement, subsystem))
# Enable the subsystem
subsystem.enable()
assert subsystem.isEnabled()
def test_profiled_pid_subsystem_disable_subsystem(subsystem):
"""
Verify the subsystem can be disabled.
"""
# Dynamically add useOutput and getMeasurement methods so the
# system can be enabled
setattr(subsystem, "useOutput", MethodType(simple_use_output, subsystem))
setattr(subsystem, "getMeasurement", MethodType(simple_get_measurement, subsystem))
# Enable and then disable the subsystem
subsystem.enable()
subsystem.disable()
assert not subsystem.isEnabled()

View File

@@ -0,0 +1,38 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_proxyCommandSchedule(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
start_spying_on(command1)
scheduleCommand = commands2.ProxyCommand(command1)
scheduler.schedule(scheduleCommand)
verify(command1).schedule()
def test_proxyCommandEnd(scheduler: commands2.CommandScheduler):
cond = OOBoolean()
command = commands2.WaitUntilCommand(cond)
scheduleCommand = commands2.ProxyCommand(command)
scheduler.schedule(scheduleCommand)
scheduler.run()
assert scheduler.isScheduled(scheduleCommand)
cond.set(True)
scheduler.run()
scheduler.run()
assert not scheduler.isScheduled(scheduleCommand)

View File

@@ -0,0 +1,69 @@
from typing import TYPE_CHECKING
import commands2
from compositiontestbase import SingleCompositionTestBase # type: ignore
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
from .compositiontestbase import SingleCompositionTestBase
import pytest
class RepeatCommandCompositionTest(SingleCompositionTestBase):
def composeSingle(self, member):
return member.repeatedly()
def test_callsMethodsCorrectly(scheduler: commands2.CommandScheduler):
command = commands2.Command()
repeated = command.repeatedly()
start_spying_on(command)
assert command.initialize.times_called == 0
assert command.execute.times_called == 0
assert command.isFinished.times_called == 0
assert command.end.times_called == 0
scheduler.schedule(repeated)
assert command.initialize.times_called == 1
assert command.execute.times_called == 0
assert command.isFinished.times_called == 0
assert command.end.times_called == 0
command.isFinished = lambda: False
scheduler.run()
assert command.initialize.times_called == 1
assert command.execute.times_called == 1
assert command.isFinished.times_called == 1
assert command.end.times_called == 0
command.isFinished = lambda: True
scheduler.run()
assert command.initialize.times_called == 1
assert command.execute.times_called == 2
assert command.isFinished.times_called == 2
assert command.end.times_called == 1
command.isFinished = lambda: False
scheduler.run()
assert command.initialize.times_called == 2
assert command.execute.times_called == 3
assert command.isFinished.times_called == 3
assert command.end.times_called == 1
command.isFinished = lambda: True
scheduler.run()
assert command.initialize.times_called == 2
assert command.execute.times_called == 4
assert command.isFinished.times_called == 4
assert command.end.times_called == 2
scheduler.cancel(repeated)
assert command.initialize.times_called == 2
assert command.execute.times_called == 4
assert command.isFinished.times_called == 4
assert command.end.times_called == 2

View File

@@ -0,0 +1,154 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
from wpilib import RobotState
def test_robotDisabledCommandCancel(scheduler: commands2.CommandScheduler):
command = commands2.Command()
scheduler.schedule(command)
assert scheduler.isScheduled(command)
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
scheduler.run()
assert not scheduler.isScheduled(command)
DriverStationSim.setEnabled(True)
DriverStationSim.notifyNewData()
def test_runWhenDisabled(scheduler: commands2.CommandScheduler):
command = commands2.Command()
command.runsWhenDisabled = lambda: True
scheduler.schedule(command)
assert scheduler.isScheduled(command)
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
scheduler.run()
assert scheduler.isScheduled(command)
def test_sequentialGroupRunWhenDisabled(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command1.runsWhenDisabled = lambda: True
command2 = commands2.Command()
command2.runsWhenDisabled = lambda: True
command3 = commands2.Command()
command3.runsWhenDisabled = lambda: True
command4 = commands2.Command()
command4.runsWhenDisabled = lambda: False
runWhenDisabled = commands2.SequentialCommandGroup(command1, command2)
dontRunWhenDisabled = commands2.SequentialCommandGroup(command3, command4)
scheduler.schedule(runWhenDisabled)
scheduler.schedule(dontRunWhenDisabled)
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
scheduler.run()
assert scheduler.isScheduled(runWhenDisabled)
assert not scheduler.isScheduled(dontRunWhenDisabled)
def test_parallelGroupRunWhenDisabled(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command1.runsWhenDisabled = lambda: True
command2 = commands2.Command()
command2.runsWhenDisabled = lambda: True
command3 = commands2.Command()
command3.runsWhenDisabled = lambda: True
command4 = commands2.Command()
command4.runsWhenDisabled = lambda: False
runWhenDisabled = commands2.ParallelCommandGroup(command1, command2)
dontRunWhenDisabled = commands2.ParallelCommandGroup(command3, command4)
scheduler.schedule(runWhenDisabled)
scheduler.schedule(dontRunWhenDisabled)
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
scheduler.run()
assert scheduler.isScheduled(runWhenDisabled)
assert not scheduler.isScheduled(dontRunWhenDisabled)
def test_conditionalRunWhenDisabled(scheduler: commands2.CommandScheduler):
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
command1 = commands2.Command()
command1.runsWhenDisabled = lambda: True
command2 = commands2.Command()
command2.runsWhenDisabled = lambda: True
command3 = commands2.Command()
command3.runsWhenDisabled = lambda: True
command4 = commands2.Command()
command4.runsWhenDisabled = lambda: False
runWhenDisabled = commands2.ConditionalCommand(command1, command2, lambda: True)
dontRunWhenDisabled = commands2.ConditionalCommand(command3, command4, lambda: True)
scheduler.schedule(runWhenDisabled, dontRunWhenDisabled)
assert scheduler.isScheduled(runWhenDisabled)
assert not scheduler.isScheduled(dontRunWhenDisabled)
def test_selectRunWhenDisabled(scheduler: commands2.CommandScheduler):
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
command1 = commands2.Command()
command1.runsWhenDisabled = lambda: True
command2 = commands2.Command()
command2.runsWhenDisabled = lambda: True
command3 = commands2.Command()
command3.runsWhenDisabled = lambda: True
command4 = commands2.Command()
command4.runsWhenDisabled = lambda: False
runWhenDisabled = commands2.SelectCommand({1: command1, 2: command2}, lambda: 1)
dontRunWhenDisabled = commands2.SelectCommand({1: command3, 2: command4}, lambda: 1)
scheduler.schedule(runWhenDisabled, dontRunWhenDisabled)
assert scheduler.isScheduled(runWhenDisabled)
assert not scheduler.isScheduled(dontRunWhenDisabled)
def test_parallelConditionalRunWhenDisabledTest(scheduler: commands2.CommandScheduler):
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
command1 = commands2.Command()
command1.runsWhenDisabled = lambda: True
command2 = commands2.Command()
command2.runsWhenDisabled = lambda: True
command3 = commands2.Command()
command3.runsWhenDisabled = lambda: True
command4 = commands2.Command()
command4.runsWhenDisabled = lambda: False
runWhenDisabled = commands2.ConditionalCommand(command1, command2, lambda: True)
dontRunWhenDisabled = commands2.ConditionalCommand(command3, command4, lambda: True)
parallel = commands2.cmd.parallel(runWhenDisabled, dontRunWhenDisabled)
scheduler.schedule(parallel)
assert not scheduler.isScheduled(runWhenDisabled)

View File

@@ -0,0 +1,22 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_runCommandSchedule(scheduler: commands2.CommandScheduler):
counter = OOInteger(0)
command = commands2.RunCommand(counter.incrementAndGet)
scheduler.schedule(command)
scheduler.run()
scheduler.run()
scheduler.run()
assert counter == 3

View File

@@ -0,0 +1,36 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_scheduleCommandSchedule(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
scheduleCommand = commands2.ScheduleCommand(command1, command2)
scheduler.schedule(scheduleCommand)
verify(command1).schedule()
verify(command2).schedule()
def test_scheduleCommandDruingRun(scheduler: commands2.CommandScheduler):
toSchedule = commands2.InstantCommand()
scheduleCommand = commands2.ScheduleCommand(toSchedule)
group = commands2.SequentialCommandGroup(
commands2.InstantCommand(), scheduleCommand
)
scheduler.schedule(group)
scheduler.schedule(commands2.RunCommand(lambda: None))
scheduler.run()

View File

@@ -0,0 +1,134 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_schedulerLambdaTestNoInterrupt(scheduler: commands2.CommandScheduler):
counter = OOInteger()
scheduler.onCommandInitialize(lambda _: counter.incrementAndGet())
scheduler.onCommandExecute(lambda _: counter.incrementAndGet())
scheduler.onCommandFinish(lambda _: counter.incrementAndGet())
scheduler.schedule(commands2.InstantCommand())
scheduler.run()
assert counter == 3
def test_schedulerInterruptLambda(scheduler: commands2.CommandScheduler):
counter = OOInteger()
scheduler.onCommandInterrupt(lambda _: counter.incrementAndGet())
command = commands2.WaitCommand(10)
scheduler.schedule(command)
scheduler.cancel(command)
assert counter == 1
def test_scheduler_interrupt_no_cause_lambda(scheduler: commands2.CommandScheduler):
counter = OOInteger()
def on_interrupt(interrupted, cause):
assert cause is None
counter.incrementAndGet()
scheduler.onCommandInterruptWithCause(on_interrupt)
command = commands2.cmd.run(lambda: {})
scheduler.schedule(command)
scheduler.cancel(command)
assert counter.get() == 1
def test_scheduler_interrupt_cause_lambda(scheduler: commands2.CommandScheduler):
counter = OOInteger()
subsystem = commands2.Subsystem()
command = subsystem.run(lambda: None)
interruptor = subsystem.runOnce(lambda: None)
def on_interrupt(interrupted, cause):
assert cause is interruptor
counter.incrementAndGet()
scheduler.onCommandInterruptWithCause(on_interrupt)
scheduler.schedule(command)
scheduler.schedule(interruptor)
assert counter.get() == 1
def test_scheduler_interrupt_cause_lambda_in_run_loop(
scheduler: commands2.CommandScheduler,
):
counter = OOInteger()
subsystem = commands2.Subsystem()
command = subsystem.run(lambda: None)
interruptor = subsystem.runOnce(lambda: None)
# This command will schedule interruptor in execute() inside the run loop
interruptor_scheduler = commands2.cmd.runOnce(
lambda: scheduler.schedule(interruptor)
)
def on_interrupt(interrupted, cause):
assert cause is interruptor
counter.incrementAndGet()
scheduler.onCommandInterruptWithCause(on_interrupt)
scheduler.schedule(command)
scheduler.schedule(interruptor_scheduler)
scheduler.run()
assert counter.get() == 1
def test_unregisterSubsystem(scheduler: commands2.CommandScheduler):
system = commands2.Subsystem()
scheduler.registerSubsystem(system)
scheduler.unregisterSubsystem(system)
def test_schedulerCancelAll(scheduler: commands2.CommandScheduler):
counter = OOInteger()
def on_interrupt(command, interruptor):
assert interruptor is None
scheduler.onCommandInterrupt(lambda _: counter.incrementAndGet())
scheduler.onCommandInterruptWithCause(on_interrupt)
command = commands2.WaitCommand(10)
command2 = commands2.WaitCommand(10)
scheduler.schedule(command)
scheduler.schedule(command2)
scheduler.cancelAll()
assert counter == 2
def test_scheduleScheduledNoOp(scheduler: commands2.CommandScheduler):
counter = OOInteger()
command = commands2.cmd.startEnd(counter.incrementAndGet, lambda: None)
scheduler.schedule(command)
scheduler.schedule(command)
assert counter == 1

View File

@@ -0,0 +1,165 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
@pytest.mark.parametrize(
"interruptionBehavior",
[
commands2.InterruptionBehavior.kCancelIncoming,
commands2.InterruptionBehavior.kCancelSelf,
],
)
def test_cancelFromInitialize(
interruptionBehavior: commands2.InterruptionBehavior,
scheduler: commands2.CommandScheduler,
):
hasOtherRun = OOBoolean()
requirement = commands2.Subsystem()
selfCancels = commands2.Command()
selfCancels.addRequirements(requirement)
selfCancels.getInterruptionBehavior = lambda: interruptionBehavior
selfCancels.initialize = lambda: scheduler.cancel(selfCancels)
other = commands2.RunCommand(lambda: hasOtherRun.set(True), requirement)
scheduler.schedule(selfCancels)
scheduler.run()
scheduler.schedule(other)
assert not scheduler.isScheduled(selfCancels)
assert scheduler.isScheduled(other)
scheduler.run()
assert hasOtherRun == True
@pytest.mark.parametrize(
"interruptionBehavior",
[
commands2.InterruptionBehavior.kCancelIncoming,
commands2.InterruptionBehavior.kCancelSelf,
],
)
def test_defaultCommandGetsRescheduledAfterSelfCanceling(
interruptionBehavior: commands2.InterruptionBehavior,
scheduler: commands2.CommandScheduler,
):
hasOtherRun = OOBoolean()
requirement = commands2.Subsystem()
selfCancels = commands2.Command()
selfCancels.addRequirements(requirement)
selfCancels.getInterruptionBehavior = lambda: interruptionBehavior
selfCancels.initialize = lambda: scheduler.cancel(selfCancels)
other = commands2.RunCommand(lambda: hasOtherRun.set(True), requirement)
scheduler.setDefaultCommand(requirement, other)
scheduler.schedule(selfCancels)
scheduler.run()
scheduler.run()
assert not scheduler.isScheduled(selfCancels)
assert scheduler.isScheduled(other)
scheduler.run()
assert hasOtherRun == True
def test_cancelFromEnd(scheduler: commands2.CommandScheduler):
counter = OOInteger()
selfCancels = commands2.Command()
@patch_via_decorator(selfCancels)
def end(self, interrupted):
counter.incrementAndGet()
scheduler.cancel(self)
scheduler.schedule(selfCancels)
scheduler.cancel(selfCancels)
assert counter == 1
assert not scheduler.isScheduled(selfCancels)
def test_scheduleFromEnd(scheduler: commands2.CommandScheduler):
counter = OOInteger()
requirement = commands2.Subsystem()
other = commands2.InstantCommand(lambda: None, requirement)
selfCancels = commands2.Command()
selfCancels.addRequirements(requirement)
@patch_via_decorator(selfCancels)
def end(self, interrupted):
counter.incrementAndGet()
scheduler.schedule(other)
scheduler.schedule(selfCancels)
scheduler.cancel(selfCancels)
assert counter == 1
assert not scheduler.isScheduled(selfCancels)
def test_scheduleFromEndInterrupt(scheduler: commands2.CommandScheduler):
counter = OOInteger()
requirement = commands2.Subsystem()
other = commands2.InstantCommand(lambda: None, requirement)
selfCancels = commands2.Command()
selfCancels.addRequirements(requirement)
@patch_via_decorator(selfCancels)
def end(self, interrupted):
counter.incrementAndGet()
scheduler.schedule(other)
scheduler.schedule(selfCancels)
scheduler.schedule(other)
assert counter == 1
assert not scheduler.isScheduled(selfCancels)
assert scheduler.isScheduled(other)
@pytest.mark.parametrize(
"interruptionBehavior",
[
commands2.InterruptionBehavior.kCancelIncoming,
commands2.InterruptionBehavior.kCancelSelf,
],
)
def test_scheduleInitializeFromDefaultCommand(
interruptionBehavior: commands2.InterruptionBehavior,
scheduler: commands2.CommandScheduler,
):
counter = OOInteger()
requirement = commands2.Subsystem()
other = commands2.InstantCommand(lambda: None, requirement).withInterruptBehavior(
interruptionBehavior
)
defaultCommand = commands2.Command()
defaultCommand.addRequirements(requirement)
@patch_via_decorator(defaultCommand)
def initialize(self):
counter.incrementAndGet()
scheduler.schedule(other)
scheduler.setDefaultCommand(requirement, defaultCommand)
scheduler.run()
scheduler.run()
scheduler.run()
assert counter == 3
assert not scheduler.isScheduled(defaultCommand)
assert scheduler.isScheduled(other)

View File

@@ -0,0 +1,94 @@
from typing import TYPE_CHECKING
import commands2
from compositiontestbase import MultiCompositionTestBase # type: ignore
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
from .compositiontestbase import MultiCompositionTestBase
import pytest
class TestSelectCommandComposition(MultiCompositionTestBase):
def compose(self, *members: commands2.Command):
return commands2.SelectCommand(dict(enumerate(members)), lambda: 0)
def test_selectCommand(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
command3 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
start_spying_on(command3)
command1.isFinished = lambda: True
selectCommand = commands2.SelectCommand(
{"one": command1, "two": command2, "three": command3}, lambda: "one"
)
scheduler.schedule(selectCommand)
scheduler.run()
verify(command1).initialize()
verify(command1).execute()
verify(command1).end(False)
verify(command2, never()).initialize()
verify(command2, never()).execute()
verify(command2, never()).end(False)
verify(command3, never()).initialize()
verify(command3, never()).execute()
verify(command3, never()).end(False)
def test_selectCommandInvalidKey(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
command3 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
start_spying_on(command3)
command1.isFinished = lambda: True
selectCommand = commands2.SelectCommand(
{"one": command1, "two": command2, "three": command3}, lambda: "four"
)
scheduler.schedule(selectCommand)
def test_selectCommandRequirement(scheduler: commands2.CommandScheduler):
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
system4 = commands2.Subsystem()
command1 = commands2.Command()
command1.addRequirements(system1, system2)
command2 = commands2.Command()
command2.addRequirements(system3)
command3 = commands2.Command()
command3.addRequirements(system3, system4)
start_spying_on(command1)
start_spying_on(command2)
start_spying_on(command3)
selectCommand = commands2.SelectCommand(
{"one": command1, "two": command2, "three": command3}, lambda: "one"
)
scheduler.schedule(selectCommand)
scheduler.schedule(commands2.InstantCommand(lambda: None, system3))
verify(command1).end(interrupted=True)
verify(command2, never()).end(interrupted=True)
verify(command3, never()).end(interrupted=True)

View File

@@ -0,0 +1,114 @@
from typing import TYPE_CHECKING
import commands2
from compositiontestbase import MultiCompositionTestBase # type: ignore
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
from .compositiontestbase import MultiCompositionTestBase
import pytest
class TestSequentialCommandGroupComposition(MultiCompositionTestBase):
def compose(self, *members: commands2.Command):
return commands2.SequentialCommandGroup(*members)
def test_sequntialGroupSchedule(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
group = commands2.SequentialCommandGroup(command1, command2)
scheduler.schedule(group)
verify(command1).initialize()
verify(command2, never()).initialize()
command1.isFinished = lambda: True
scheduler.run()
verify(command1).execute()
verify(command1).end(False)
verify(command2).initialize()
verify(command2, never()).execute()
verify(command2, never()).end(False)
command2.isFinished = lambda: True
scheduler.run()
verify(command1).execute()
verify(command1).end(False)
verify(command2).execute()
verify(command2).end(False)
assert not scheduler.isScheduled(group)
def test_sequentialGroupInterrupt(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
command3 = commands2.Command()
start_spying_on(command1)
start_spying_on(command2)
start_spying_on(command3)
group = commands2.SequentialCommandGroup(command1, command2, command3)
scheduler.schedule(group)
command1.isFinished = lambda: True
scheduler.run()
scheduler.cancel(group)
scheduler.run()
verify(command1).execute()
verify(command1, never()).end(True)
verify(command1).end(False)
verify(command2, never()).execute()
verify(command2).end(True)
verify(command3, never()).initialize()
verify(command3, never()).execute()
# assert command3.end.times_called == 0
verify(command3, never()).end(True)
verify(command3, never()).end(False)
assert not scheduler.isScheduled(group)
def test_notScheduledCancel(scheduler: commands2.CommandScheduler):
command1 = commands2.Command()
command2 = commands2.Command()
group = commands2.SequentialCommandGroup(command1, command2)
scheduler.cancel(group)
def test_sequentialGroupRequirement(scheduler: commands2.CommandScheduler):
system1 = commands2.Subsystem()
system2 = commands2.Subsystem()
system3 = commands2.Subsystem()
system4 = commands2.Subsystem()
command1 = commands2.InstantCommand()
command1.addRequirements(system1, system2)
command2 = commands2.InstantCommand()
command2.addRequirements(system3)
command3 = commands2.InstantCommand()
command3.addRequirements(system3, system4)
group = commands2.SequentialCommandGroup(command1, command2)
scheduler.schedule(group)
scheduler.schedule(command3)
assert not scheduler.isScheduled(group)
assert scheduler.isScheduled(command3)

View File

@@ -0,0 +1,30 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_startEndCommandSchedule(scheduler: commands2.CommandScheduler):
cond1 = OOBoolean(False)
cond2 = OOBoolean(False)
command = commands2.StartEndCommand(
lambda: cond1.set(True),
lambda: cond2.set(True),
)
scheduler.schedule(command)
scheduler.run()
assert scheduler.isScheduled(command)
scheduler.cancel(command)
assert not scheduler.isScheduled(command)
assert cond1 == True
assert cond2 == True

View File

@@ -0,0 +1,142 @@
# Copyright (c) FIRST and other WPILib contributors.
# Open Source Software; you can modify and/or share it under the terms of
# the WPILib BSD license file in the root directory of this project.
from typing import TYPE_CHECKING, List, Tuple
import math
from wpilib import Timer
from wpimath.geometry import Pose2d, Rotation2d, Translation2d
from wpimath.kinematics import (
SwerveModuleState,
SwerveModulePosition,
SwerveDrive4Kinematics,
SwerveDrive4Odometry,
)
from wpimath.controller import (
ProfiledPIDControllerRadians,
PIDController,
HolonomicDriveController,
)
from wpimath.trajectory import (
TrapezoidProfileRadians,
Trajectory,
TrajectoryConfig,
TrajectoryGenerator,
)
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
import commands2
def test_swervecontrollercommand():
timer = Timer()
angle = Rotation2d(0)
swerve_module_states = (
SwerveModuleState(0, Rotation2d(0)),
SwerveModuleState(0, Rotation2d(0)),
SwerveModuleState(0, Rotation2d(0)),
SwerveModuleState(0, Rotation2d(0)),
)
swerve_module_positions = (
SwerveModulePosition(0, Rotation2d(0)),
SwerveModulePosition(0, Rotation2d(0)),
SwerveModulePosition(0, Rotation2d(0)),
SwerveModulePosition(0, Rotation2d(0)),
)
rot_controller = ProfiledPIDControllerRadians(
1,
0,
0,
TrapezoidProfileRadians.Constraints(3 * math.pi, math.pi),
)
x_tolerance = 1 / 12.0
y_tolerance = 1 / 12.0
angular_tolerance = 1 / 12.0
wheel_base = 0.5
track_width = 0.5
kinematics = SwerveDrive4Kinematics(
Translation2d(wheel_base / 2, track_width / 2),
Translation2d(wheel_base / 2, -track_width / 2),
Translation2d(-wheel_base / 2, track_width / 2),
Translation2d(-wheel_base / 2, -track_width / 2),
)
odometry = SwerveDrive4Odometry(
kinematics,
Rotation2d(0),
swerve_module_positions,
Pose2d(0, 0, Rotation2d(0)),
)
def set_module_states(states):
nonlocal swerve_module_states
swerve_module_states = states
def get_robot_pose() -> Pose2d:
odometry.update(angle, swerve_module_positions)
return odometry.getPose()
with ManualSimTime() as sim:
subsystem = commands2.Subsystem()
waypoints: List[Pose2d] = []
waypoints.append(Pose2d(0, 0, Rotation2d(0)))
waypoints.append(Pose2d(1, 5, Rotation2d(3)))
config = TrajectoryConfig(8.8, 0.1)
trajectory = TrajectoryGenerator.generateTrajectory(waypoints, config)
end_state = trajectory.sample(trajectory.totalTime())
command = commands2.SwerveControllerCommand(
trajectory=trajectory,
pose=get_robot_pose,
kinematics=kinematics,
controller=HolonomicDriveController(
PIDController(0.6, 0, 0),
PIDController(0.6, 0, 0),
rot_controller,
),
outputModuleStates=set_module_states,
requirements=(subsystem,),
)
timer.restart()
command.initialize()
while not command.isFinished():
command.execute()
angle = trajectory.sample(timer.get()).pose.rotation()
for i in range(0, len(swerve_module_positions)):
swerve_module_positions[i].distance += (
swerve_module_states[i].speed * 0.005
)
swerve_module_positions[i].angle = swerve_module_states[i].angle
sim.step(0.005)
timer.stop()
command.end(True)
assert end_state.pose.X() == pytest.approx(get_robot_pose().X(), x_tolerance)
assert end_state.pose.Y() == pytest.approx(get_robot_pose().Y(), y_tolerance)
assert end_state.pose.rotation().radians() == pytest.approx(
get_robot_pose().rotation().radians(),
angular_tolerance,
)

View File

@@ -0,0 +1,168 @@
import pytest
from unittest.mock import Mock, call, ANY
from wpilib.simulation import stepTiming, pauseTiming, resumeTiming
from wpimath.units import volts
from commands2 import Command, Subsystem
from commands2.sysid import SysIdRoutine
from wpilib.sysid import SysIdRoutineLog, State
class Mechanism(Subsystem):
def recordState(self, state: State):
pass
def drive(self, voltage: volts):
pass
def log(self, log: SysIdRoutineLog):
pass
@pytest.fixture
def mechanism():
return Mock(spec=Mechanism)
@pytest.fixture
def sysid_routine(mechanism):
return SysIdRoutine(
SysIdRoutine.Config(recordState=mechanism.recordState),
SysIdRoutine.Mechanism(mechanism.drive, mechanism.log, Subsystem()),
)
@pytest.fixture
def quasistatic_forward(sysid_routine):
return sysid_routine.quasistatic(SysIdRoutine.Direction.kForward)
@pytest.fixture
def quasistatic_reverse(sysid_routine):
return sysid_routine.quasistatic(SysIdRoutine.Direction.kReverse)
@pytest.fixture
def dynamic_forward(sysid_routine):
return sysid_routine.dynamic(SysIdRoutine.Direction.kForward)
@pytest.fixture
def dynamic_reverse(sysid_routine):
return sysid_routine.dynamic(SysIdRoutine.Direction.kReverse)
@pytest.fixture(autouse=True)
def timing():
pauseTiming()
yield
resumeTiming()
def run_command(command: Command):
command.initialize()
command.execute()
stepTiming(1)
command.execute()
command.end(True)
def test_record_state_bookends_motor_logging(
mechanism, quasistatic_forward, dynamic_forward
):
run_command(quasistatic_forward)
mechanism.assert_has_calls(
[
call.drive(ANY),
call.log(ANY),
call.recordState(State.kQuasistaticForward),
call.drive(ANY),
call.recordState(State.kNone),
],
any_order=False,
)
mechanism.reset_mock()
run_command(dynamic_forward)
mechanism.assert_has_calls(
[
call.drive(ANY),
call.log(ANY),
call.recordState(State.kDynamicForward),
call.drive(ANY),
call.recordState(State.kNone),
],
any_order=False,
)
def test_tests_declare_correct_state(
mechanism,
quasistatic_forward,
quasistatic_reverse,
dynamic_forward,
dynamic_reverse,
):
run_command(quasistatic_forward)
mechanism.recordState.assert_any_call(State.kQuasistaticForward)
run_command(quasistatic_reverse)
mechanism.recordState.assert_any_call(State.kQuasistaticReverse)
run_command(dynamic_forward)
mechanism.recordState.assert_any_call(State.kDynamicForward)
run_command(dynamic_reverse)
mechanism.recordState.assert_any_call(State.kDynamicReverse)
def test_tests_output_correct_voltage(
mechanism,
quasistatic_forward,
quasistatic_reverse,
dynamic_forward,
dynamic_reverse,
):
run_command(quasistatic_forward)
mechanism.drive.assert_has_calls(
[
call(pytest.approx(1.0)),
call(pytest.approx(0.0)),
],
any_order=False,
)
mechanism.reset_mock()
run_command(quasistatic_reverse)
mechanism.drive.assert_has_calls(
[
call(pytest.approx(-1.0)),
call(pytest.approx(0.0)),
],
any_order=False,
)
mechanism.reset_mock()
run_command(dynamic_forward)
mechanism.drive.assert_has_calls(
[
call(pytest.approx(7.0)),
call(pytest.approx(0.0)),
],
any_order=False,
)
mechanism.reset_mock()
run_command(dynamic_reverse)
mechanism.drive.assert_has_calls(
[
call(pytest.approx(-7.0)),
call(pytest.approx(0.0)),
],
any_order=False,
)

View File

@@ -0,0 +1,142 @@
# Copyright (c) FIRST and other WPILib contributors.
# Open Source Software; you can modify and/or share it under the terms of
# the WPILib BSD license file in the root directory of this project.
from typing import TYPE_CHECKING, List, Tuple
import math
import wpimath.controller as controller
import wpimath.trajectory as trajectory
import wpimath.geometry as geometry
import wpimath.kinematics as kinematics
from wpimath.trajectory import TrapezoidProfile as DimensionlessProfile
from wpimath.trajectory import TrapezoidProfileRadians as RadiansProfile
from wpilib import Timer
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
import commands2
class TrapezoidProfileRadiansFixture:
def __init__(self):
constraints: RadiansProfile.Constraints = RadiansProfile.Constraints(
3 * math.pi, math.pi
)
self._profile: RadiansProfile = RadiansProfile(constraints)
self._goal_state = RadiansProfile.State(3, 0)
self._state = self._profile.calculate(
0, self._goal_state, RadiansProfile.State(0, 0)
)
self._timer = Timer()
def profileOutput(self, state: RadiansProfile.State) -> None:
self._state = state
def currentState(self) -> RadiansProfile.State:
return self._state
def getGoal(self) -> RadiansProfile.State:
return self._goal_state
@pytest.fixture()
def get_trapezoid_profile_radians() -> TrapezoidProfileRadiansFixture:
return TrapezoidProfileRadiansFixture()
class TrapezoidProfileFixture:
def __init__(self):
constraints: DimensionlessProfile.Constraints = (
DimensionlessProfile.Constraints(3 * math.pi, math.pi)
)
self._profile: DimensionlessProfile = DimensionlessProfile(constraints)
self._goal_state = DimensionlessProfile.State(3, 0)
self._state = self._profile.calculate(
0, self._goal_state, DimensionlessProfile.State(0, 0)
)
self._timer = Timer()
def profileOutput(self, state: DimensionlessProfile.State) -> None:
self._state = state
def currentState(self) -> DimensionlessProfile.State:
return self._state
def getGoal(self) -> DimensionlessProfile.State:
return self._goal_state
@pytest.fixture()
def get_trapezoid_profile_dimensionless() -> TrapezoidProfileFixture:
return TrapezoidProfileFixture()
def test_trapezoidProfileDimensionless(
scheduler: commands2.CommandScheduler, get_trapezoid_profile_dimensionless
):
with ManualSimTime() as sim:
subsystem = commands2.Subsystem()
fixture_data = get_trapezoid_profile_dimensionless
command = commands2.TrapezoidProfileCommand(
fixture_data._profile,
fixture_data.profileOutput,
fixture_data.getGoal,
fixture_data.currentState,
subsystem,
)
fixture_data._timer.restart()
command.initialize()
count = 0
while not command.isFinished():
command.execute()
count += 1
sim.step(0.005)
fixture_data._timer.stop()
command.end(True)
def test_trapezoidProfileRadians(
scheduler: commands2.CommandScheduler, get_trapezoid_profile_radians
):
with ManualSimTime() as sim:
subsystem = commands2.Subsystem()
fixture_data = get_trapezoid_profile_radians
command = commands2.TrapezoidProfileCommand(
fixture_data._profile,
fixture_data.profileOutput,
fixture_data.getGoal,
fixture_data.currentState,
subsystem,
)
fixture_data._timer.restart()
command.initialize()
count = 0
while not command.isFinished():
command.execute()
count += 1
sim.step(0.005)
fixture_data._timer.stop()
command.end(True)

View File

@@ -0,0 +1,236 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
from wpilib.simulation import stepTiming
if TYPE_CHECKING:
from .util import *
def test_onTrue(scheduler: commands2.CommandScheduler):
finished = OOBoolean(False)
command1 = commands2.WaitUntilCommand(finished)
button = InternalButton()
button.setPressed(False)
button.onTrue(command1)
scheduler.run()
assert not command1.isScheduled()
button.setPressed(True)
scheduler.run()
assert command1.isScheduled()
finished.set(True)
scheduler.run()
assert not command1.isScheduled()
def test_onFalse(scheduler: commands2.CommandScheduler):
finished = OOBoolean(False)
command1 = commands2.WaitUntilCommand(finished)
button = InternalButton()
button.setPressed(True)
button.onFalse(command1)
scheduler.run()
assert not command1.isScheduled()
button.setPressed(False)
scheduler.run()
assert command1.isScheduled()
finished.set(True)
scheduler.run()
assert not command1.isScheduled()
def test_onChange(scheduler: commands2.CommandScheduler):
finished = OOBoolean(False)
command1 = commands2.WaitUntilCommand(finished)
button = InternalButton()
button.setPressed(True)
button.onChange(command1)
scheduler.run()
assert not command1.isScheduled()
button.setPressed(False)
scheduler.run()
assert command1.isScheduled()
finished.set(True)
scheduler.run()
assert not command1.isScheduled()
def test_whileTrueRepeatedly(scheduler: commands2.CommandScheduler):
inits = OOInteger(0)
counter = OOInteger(0)
command1 = commands2.FunctionalCommand(
inits.incrementAndGet,
lambda: None,
lambda _: None,
lambda: counter.incrementAndGet() % 2 == 0,
).repeatedly()
button = InternalButton()
button.setPressed(False)
button.whileTrue(command1)
scheduler.run()
assert inits == 0
button.setPressed(True)
scheduler.run()
assert inits == 1
scheduler.run()
assert inits == 1
scheduler.run()
assert inits == 2
button.setPressed(False)
scheduler.run()
assert inits == 2
def test_whileTrueLambdaRunCommand(scheduler: commands2.CommandScheduler):
counter = OOInteger(0)
command1 = commands2.RunCommand(counter.incrementAndGet)
button = InternalButton()
button.setPressed(False)
button.whileTrue(command1)
scheduler.run()
assert counter == 0
button.setPressed(True)
scheduler.run()
assert counter == 1
scheduler.run()
assert counter == 2
button.setPressed(False)
scheduler.run()
assert counter == 2
def test_whileTrueOnce(scheduler: commands2.CommandScheduler):
startCounter = OOInteger(0)
endCounter = OOInteger(0)
command1 = commands2.StartEndCommand(
startCounter.incrementAndGet, endCounter.incrementAndGet
)
button = InternalButton()
button.setPressed(False)
button.whileTrue(command1)
scheduler.run()
assert startCounter == 0
assert endCounter == 0
button.setPressed(True)
scheduler.run()
scheduler.run()
assert startCounter == 1
assert endCounter == 0
button.setPressed(False)
scheduler.run()
assert startCounter == 1
assert endCounter == 1
def test_toggleOnTrue(scheduler: commands2.CommandScheduler):
startCounter = OOInteger(0)
endCounter = OOInteger(0)
command1 = commands2.StartEndCommand(
startCounter.incrementAndGet, endCounter.incrementAndGet
)
button = InternalButton()
button.setPressed(False)
button.toggleOnTrue(command1)
scheduler.run()
assert startCounter == 0
assert endCounter == 0
button.setPressed(True)
scheduler.run()
scheduler.run()
assert startCounter == 1
assert endCounter == 0
button.setPressed(False)
scheduler.run()
assert startCounter == 1
assert endCounter == 0
button.setPressed(True)
scheduler.run()
assert startCounter == 1
assert endCounter == 1
def test_cancelWhenActive(scheduler: commands2.CommandScheduler):
startCounter = OOInteger(0)
endCounter = OOInteger(0)
button = InternalButton()
command1 = commands2.StartEndCommand(
startCounter.incrementAndGet, endCounter.incrementAndGet
).until(button)
button.setPressed(False)
command1.schedule()
scheduler.run()
assert startCounter == 1
assert endCounter == 0
button.setPressed(True)
scheduler.run()
assert startCounter == 1
assert endCounter == 1
scheduler.run()
assert startCounter == 1
assert endCounter == 1
def test_triggerComposition():
button1 = InternalButton()
button2 = InternalButton()
button1.setPressed(True)
button2.setPressed(False)
assert button1.and_(button2).getAsBoolean() == False
assert button1.or_(button2)() == True
assert bool(button1.negate()) == False
assert (button1 & ~button2)() == True
def test_triggerCompositionSupplier():
button1 = InternalButton()
supplier = lambda: False
button1.setPressed(True)
assert button1.and_(supplier)() == False
assert button1.or_(supplier)() == True
def test_debounce(scheduler: commands2.CommandScheduler):
command = commands2.Command()
start_spying_on(command)
button = InternalButton()
debounced = button.debounce(0.1)
debounced.onTrue(command)
button.setPressed(True)
scheduler.run()
verify(command, never()).schedule()
stepTiming(0.3)
button.setPressed(True)
scheduler.run()
verify(command).schedule()
def test_booleanSupplier():
button = InternalButton()
assert button() == False
button.setPressed(True)
assert button() == True

View File

@@ -0,0 +1,50 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_waitCommand(scheduler: commands2.CommandScheduler):
with ManualSimTime() as sim:
waitCommand = commands2.WaitCommand(2)
scheduler.schedule(waitCommand)
scheduler.run()
sim.step(1)
scheduler.run()
assert scheduler.isScheduled(waitCommand)
sim.step(2)
scheduler.run()
assert not scheduler.isScheduled(waitCommand)
def test_withTimeout(scheduler: commands2.CommandScheduler):
with ManualSimTime() as sim:
command1 = commands2.Command()
start_spying_on(command1)
timeout = command1.withTimeout(2)
scheduler.schedule(timeout)
scheduler.run()
verify(command1).initialize()
verify(command1).execute()
assert not scheduler.isScheduled(command1)
assert scheduler.isScheduled(timeout)
sim.step(3)
scheduler.run()
verify(command1).end(True)
verify(command1, never()).end(False)
assert not scheduler.isScheduled(timeout)

View File

@@ -0,0 +1,22 @@
from typing import TYPE_CHECKING
import commands2
from util import * # type: ignore
if TYPE_CHECKING:
from .util import *
import pytest
def test_waitUntil(scheduler: commands2.CommandScheduler):
condition = OOBoolean()
command = commands2.WaitUntilCommand(condition)
scheduler.schedule(command)
scheduler.run()
assert scheduler.isScheduled(command)
condition.set(True)
scheduler.run()
assert not scheduler.isScheduled(command)

View File

@@ -0,0 +1,294 @@
from typing import Any, Dict, TypeVar, Type
import inspect
import commands2
from wpilib.simulation import DriverStationSim, pauseTiming, resumeTiming, stepTiming
Y = TypeVar("Y")
def full_subclass_of(cls: Type[Y]) -> Type[Y]:
# Pybind classes can't be monkeypatched.
# This generates a subclass with every method filled out
# so that it can be monkeypatched.
retlist = []
clsname = cls.__name__ # + "_Subclass"
classdef = f"class {clsname}(cls):\n"
for name in dir(cls):
# for name in set(dir(cls)):
value = getattr(cls, name)
if callable(value) and not inspect.isclass(value) and not name.startswith("_"):
classdef += f" def {name}(self, *args, **kwargs):\n"
classdef += f" return super().{name}(*args, **kwargs)\n"
classdef += " ...\n"
classdef += f"retlist.append({clsname})\n"
print(classdef)
exec(classdef, globals(), locals())
return retlist[0]
class ManualSimTime:
def __enter__(self) -> "ManualSimTime":
pauseTiming()
return self
def __exit__(self, *args):
resumeTiming()
def step(self, delta: float):
stepTiming(delta)
class OOInteger:
def __init__(self, value: int = 0) -> None:
self.value = value
def get(self) -> int:
return self.value
def set(self, value: int):
self.value = value
def incrementAndGet(self) -> int:
self.value += 1
return self.value
def addAndGet(self, value: int) -> int:
self.value += value
return self.value
def __eq__(self, value: float) -> bool:
return self.value == value
def __lt__(self, value: float) -> bool:
return self.value < value
def __call__(self) -> int:
return self.value
class OOBoolean:
def __init__(self, value: bool = False) -> None:
self.value = value
def get(self) -> bool:
return self.value
def set(self, value: bool):
self.value = value
def __eq__(self, value: object) -> bool:
return self.value == value
def __bool__(self) -> bool:
return self.value
def __call__(self) -> bool:
return self.value
class InternalButton(commands2.button.Trigger):
def __init__(self):
super().__init__(self.isPressed)
self.pressed = False
def isPressed(self) -> bool:
return self.pressed
def setPressed(self, value: bool):
self.pressed = value
def __call__(self) -> bool:
return self.pressed
class OOFloat:
def __init__(self, value: float = 0.0) -> None:
self.value = value
def get(self) -> float:
return self.value
def set(self, value: float):
self.value = value
def incrementAndGet(self) -> float:
self.value += 1
return self.value
def addAndGet(self, value: float) -> float:
self.value += value
return self.value
def __eq__(self, value: float) -> bool:
return self.value == value
def __lt__(self, value: float) -> bool:
return self.value < value
def __call__(self) -> float:
return self.value
def __name__(self) -> str:
return "OOFloat"
##########################################
# Fakito Framework
def _get_all_args_as_kwargs(method, *args, **kwargs) -> Dict[str, Any]:
try:
import inspect
method_args = inspect.getcallargs(method, *args, **kwargs)
method_arg_names = list(inspect.signature(method).parameters.keys())
for idx, arg in enumerate(args):
method_args[method_arg_names[idx]] = arg
try:
del method_args["self"]
except KeyError:
pass
return method_args
except TypeError:
# Pybind methods can't be inspected
# The exact args/kwargs that are passed in are checked instead
r = {}
for idx, arg in enumerate(args):
r[idx] = arg
r.update(kwargs)
return r
class MethodWrapper:
def __init__(self, method):
self.method = method
self.og_method = method
self.times_called = 0
self.call_log = []
def __call__(self, *args, **kwargs):
self.times_called += 1
method_args = _get_all_args_as_kwargs(self.method, *args, **kwargs)
self.call_log.append(method_args)
return self.method(*args, **kwargs)
def called_with(self, *args, **kwargs):
return _get_all_args_as_kwargs(self.method, *args, **kwargs) in self.call_log
def times_called_with(self, *args, **kwargs):
return self.call_log.count(
_get_all_args_as_kwargs(self.method, *args, **kwargs)
)
def start_spying_on(obj: Any) -> None:
"""
Mocks all methods on an object, so that that call info can be used in asserts.
Example:
```
obj = SomeClass()
start_spying_on(obj)
obj.method()
obj.method = lambda: None # supports monkeypatching
assert obj.method.times_called == 2
assert obj.method.called_with(arg1=1, arg2=2)
assert obj.method.times_called_with(arg1=1, arg2=2) == 2
```
"""
for name in dir(obj):
value = getattr(obj, name)
if callable(value) and not inspect.isclass(value) and not name.startswith("_"):
setattr(obj, name, MethodWrapper(value))
if not hasattr(obj.__class__, "_is_being_spied_on"):
try:
old_setattr = obj.__class__.__setattr__
except AttributeError:
old_setattr = object.__setattr__
def _setattr(self, name, value):
if name in dir(self):
existing_value = getattr(self, name)
if isinstance(existing_value, MethodWrapper):
existing_value.method = value
return
old_setattr(self, name, value)
obj.__class__.__setattr__ = _setattr
obj.__class__._is_being_spied_on = True
# fakito verify
def reset(obj: Any) -> None:
"""
Resets the call log of all mocked methods on an object.
Also restores all monkeypatched methods.
"""
for name in dir(obj):
value = getattr(obj, name)
if isinstance(value, MethodWrapper):
value.method = value.og_method
value.times_called = 0
value.call_log = []
class times:
def __init__(self, times: int) -> None:
self.times = times
def never() -> times:
return times(0)
class _verify:
def __init__(self, obj: Any, times: times = times(1)):
self.obj = obj
self.times = times.times
def __getattribute__(self, name: str) -> Any:
def self_dot(name: str):
return super(_verify, self).__getattribute__(name)
def times_string(times: int) -> str:
if times == 1:
return "1 time"
else:
return f"{times} times"
def check(*args, **kwargs):
__tracebackhide__ = True
# import code
# code.interact(local={**globals(), **locals()})
method = getattr(self_dot("obj"), name)
# method = getattr(self1.obj, name)
assert method.times_called_with(*args, **kwargs) == self_dot(
"times"
), f"Expected {name} to be called {times_string(self_dot('times'))} with {args} {kwargs}, but was called {times_string(method.times_called_with(*args, **kwargs))}"
return check
T = TypeVar("T")
def verify(obj: T, times: times = times(1)) -> T:
# import code
# code.interact(local={**globals(), **locals()})
return _verify(obj, times) # type: ignore
def patch_via_decorator(obj: Any):
def decorator(method):
setattr(obj, method.__name__, method.__get__(obj, obj.__class__))
return method
return decorator