[copybara] mostrobotpy to allwpilib (#8545)

Project import generated by Copybara.
GitOrigin-RevId: f10284b37498bb6a088891ca41f160793ec7fd90
This commit is contained in:
PJ Reiniger
2026-01-12 22:11:02 -05:00
committed by GitHub
parent 7e1260b003
commit 762d1e8b93
64 changed files with 2665 additions and 66 deletions

View File

@@ -228,6 +228,4 @@ try:
except ImportError:
__version__ = "master"
from ._impl.main import run
__all__ += ["CameraServer", "run"]
__all__ += ["CameraServer"]

View File

@@ -0,0 +1,51 @@
import pathlib
import sys
builtin_tests = """'''
This test module imports tests that come with wpilib, and can be used
to test basic functionality of just about any robot.
'''
from wpilib.testing.robot_tests import *
"""
class AddTests:
"""
Adds default robot tests to your robot project directory
"""
def __init__(self, parser=None):
pass
def run(self, main_file: pathlib.Path, project_path: pathlib.Path):
if not main_file.exists():
print(
f"ERROR: is this a robot project? {main_file} does not exist",
file=sys.stderr,
)
return 1
try_dirs = [project_path / "tests", project_path / ".." / "tests"]
test_directory = try_dirs[0]
for d in try_dirs:
if d.exists():
test_directory = d
break
else:
test_directory.mkdir(parents=True)
print(f"Tests directory is {test_directory}")
print()
builtin_tests_file = test_directory / "robot_test.py"
if builtin_tests_file.exists():
print("- robot_test.py already exists")
else:
with open(builtin_tests_file, "w") as fp:
fp.write(builtin_tests)
print("- builtin tests created at", builtin_tests_file)
print()
print("Robot tests can be ran via 'python3 -m robotpy test'")

View File

@@ -0,0 +1,10 @@
class Main:
"""
Executes the robot code using the currently installed HAL (this is probably not what you want unless this is controlling the physical robot)
"""
def __init__(self, parser):
pass
def run(self, options, robot_class, **static_options):
return robot_class.main(robot_class)

View File

@@ -0,0 +1,78 @@
import os
import argparse
import importlib.metadata
import logging
import sys
import typing
import wpilib
logger = logging.getLogger("robot.sim")
class RobotSim:
"""
Runs the robot in simulation mode
"""
def __init__(self, parser: argparse.ArgumentParser):
parser.add_argument(
"--nogui",
default=False,
action="store_true",
help="Don't use the WPIlib simulation gui",
)
self.simexts = {}
for entry_point in importlib.metadata.entry_points(group="robotpy_sim.2027"):
try:
sim_ext_module = entry_point.load()
except ImportError:
print(f"WARNING: Error detected in {entry_point}", file=sys.stderr)
continue
self.simexts[entry_point.name] = sim_ext_module
try:
cmd_help = importlib.metadata.metadata(entry_point.dist.name)["summary"]
except AttributeError:
cmd_help = "Load specified simulation extension"
parser.add_argument(
f"--{entry_point.name}",
default=False,
action="store_true",
help=cmd_help,
)
def run(
self,
options: argparse.Namespace,
nogui: bool,
robot_class: typing.Type[wpilib.RobotBase],
):
if not nogui:
try:
import halsim_gui
except ImportError:
print("robotpy-halsim-gui is not installed!", file=sys.stderr)
exit(1)
else:
halsim_gui.loadExtension()
# Some extensions (gui) changes the current directory
cwd = os.getcwd()
for name, module in self.simexts.items():
if getattr(options, name.replace("-", "_"), False):
try:
module.loadExtension()
except:
print(f"Error loading {name}!", file=sys.stderr)
raise
os.chdir(cwd)
# run the robot
return robot_class.main(robot_class)

View File

@@ -0,0 +1,244 @@
import logging
import os
from os.path import abspath
import inspect
import pathlib
import sys
import tomllib
import typing
import wpilib
import pytest
from ..testing import pytest_plugin
# TODO: setting the plugins so that the end user can invoke pytest directly
# could be a useful thing. Will have to consider that later.
logger = logging.getLogger("test")
class _TryAgain(Exception):
pass
#
# main test class
#
class RobotTest:
"""
Executes unit tests on the robot code using a special pytest plugin
"""
def __init__(self, parser=None):
if parser:
parser.add_argument(
"--builtin",
default=False,
action="store_true",
help="Use builtin tests if no tests are specified",
)
isolation_group = parser.add_mutually_exclusive_group()
isolation_group.add_argument(
"--isolated",
default=None,
dest="isolated",
action="store_true",
help="Run each test in a separate robot process (default). Set `tool.robotpy.testing.isolated` in your pyproject.toml to control the default",
)
isolation_group.add_argument(
"--no-isolation",
dest="isolated",
action="store_false",
help="Disable isolated test mode and run tests in-process",
)
parser.add_argument(
"--coverage-mode",
default=False,
action="store_true",
help="This flag is passed when trying to determine coverage",
)
parser.add_argument(
"pytest_args",
nargs="*",
help="To pass args to pytest, specify --<space>, then the args",
)
parser.add_argument(
"-j",
"--jobs",
type=int,
default=-1,
help="Maximum isolated robot processes (default: max CPUs - 1)",
)
def run(
self,
main_file: pathlib.Path,
project_path: pathlib.Path,
robot_class: typing.Type[wpilib.RobotBase],
builtin: bool,
isolated: typing.Optional[bool],
coverage_mode: bool,
verbose: bool,
pytest_args: typing.List[str],
jobs: int,
):
if isolated is None:
pyproject_path = project_path / "pyproject.toml"
if pyproject_path.exists():
with open(pyproject_path, "rb") as fp:
d = tomllib.load(fp)
try:
v = d["tool"]["robotpy"]["testing"]["isolated"]
except KeyError:
pass
else:
if not isinstance(v, bool):
raise ValueError(
f"tool.robotpy.testing.isolated must be a boolean value (got {v})"
)
isolated = v
if isolated is None:
isolated = True
try:
return self._run_test(
main_file,
project_path,
robot_class,
builtin,
isolated,
coverage_mode,
verbose,
pytest_args,
jobs,
)
except _TryAgain:
return self._run_test(
main_file,
project_path,
robot_class,
builtin,
isolated,
coverage_mode,
verbose,
pytest_args,
jobs,
)
def _run_test(
self,
main_file: pathlib.Path,
project_path: pathlib.Path,
robot_class: typing.Type[wpilib.RobotBase],
builtin: bool,
isolated: bool,
coverage_mode: bool,
verbose: bool,
pytest_args: typing.List[str],
jobs: int,
):
# find test directory, change current directory so pytest can find the tests
# -> assume that tests reside in tests or ../tests
curdir = pathlib.Path.cwd().absolute()
self.try_dirs = [
((project_path / "tests").absolute(), False),
((project_path / ".." / "tests").absolute(), True),
]
for d, chdir in self.try_dirs:
if d.exists():
builtin = False
if chdir:
os.chdir(d)
break
else:
if not builtin:
print("ERROR: Cannot run robot tests, as test directory was not found!")
retv = self._no_tests(main_file, project_path)
return 1
from ..testing import robot_tests
pytest_args.insert(0, abspath(inspect.getfile(robot_tests)))
try:
if isolated:
from ..testing import pytest_isolated_tests_plugin
retv = pytest.main(
pytest_args,
plugins=[
pytest_isolated_tests_plugin.IsolatedTestsPlugin(
robot_class, main_file, builtin, verbose, jobs
)
],
)
else:
retv = pytest.main(
pytest_args,
plugins=[
pytest_plugin.RobotTestingPlugin(robot_class, main_file, False)
],
)
finally:
os.chdir(curdir)
# requires pytest 2.8.x
if retv == 5:
print()
print("ERROR: a tests directory was found, but no tests were defined")
retv = self._no_tests(main_file, project_path, retv)
return retv
def _no_tests(
self, main_file: pathlib.Path, project_path: pathlib.Path, r: int = 1
):
print()
print("Looked for tests at:")
for d, _ in self.try_dirs:
print("-", d)
print()
print(
"If you don't want to write your own tests, wpilib comes with generic tests"
)
print("that can test basic functionality of most robots. You can run them by")
print("specifying the --builtin option.")
print()
if not sys.stdin.isatty():
print(
"Alternatively, to create a tests directory with the builtin tests, you can run:"
)
print()
print(" python3 -m robotpy add-tests")
print()
else:
if yesno("Create a tests directory with builtin tests now?"):
from .cli_add_tests import AddTests
add_tests = AddTests()
add_tests.run(main_file, project_path)
raise _TryAgain()
return r
def yesno(prompt):
"""Returns True if user answers 'y'"""
prompt += " [y/n]"
a = ""
while a not in ["y", "n"]:
a = input(prompt).lower()
return a == "y"

View File

@@ -1,18 +0,0 @@
import inspect
import sys
def run(robot_class, **kwargs):
"""
``wpilib.run`` is no longer used. You should run your robot code via one of
the following methods instead:
* Windows: ``py -m robotpy [arguments]``
* Linux/macOS: ``python -m robotpy [arguments]``
In a virtualenv the ``robotpy`` command can be used directly.
"""
msg = inspect.cleandoc(inspect.getdoc(run) or "`wpilib.run` is no longer used")
print(msg, file=sys.stderr)
sys.exit(1)

View File

@@ -77,18 +77,6 @@ def _log_versions(robotpy_version: typing.Optional[str]):
logger.debug("%s version %s", k, v)
class Main:
"""
Executes the robot code using the currently installed HAL (this is probably not what you want unless you're on the roboRIO)
"""
def __init__(self, parser):
pass
def run(self, options, robot_class, **static_options):
return robot_class.main(robot_class)
class RobotStarter:
def __init__(self):
self.logger = logging.getLogger("robotpy")

View File

@@ -6,6 +6,7 @@ __all__ = ["OpModeRobot"]
from ._wpilib import OpModeRobotBase, OpMode
class OpModeRobot(OpModeRobotBase):
"""
OpModeRobot implements the opmode-based robot program framework.
@@ -23,14 +24,16 @@ class OpModeRobot(OpModeRobotBase):
def __init__(self):
super().__init__()
def addOpMode(self,
opmodeCls: type,
mode: RobotMode,
name: str,
group: Optional[str] = None,
description: Optional[str] = None,
textColor: Optional[Color] = None,
backgroundColor: Optional[Color] = None) -> None:
def addOpMode(
self,
opmodeCls: type,
mode: RobotMode,
name: str,
group: Optional[str] = None,
description: Optional[str] = None,
textColor: Optional[Color] = None,
backgroundColor: Optional[Color] = None,
) -> None:
"""
Adds an operating mode option. It's necessary to call PublishOpModes() to
make the added modes visible to the driver station.
@@ -48,6 +51,7 @@ class OpModeRobot(OpModeRobotBase):
:param textColor: text color
:param backgroundColor: background color
"""
def makeOpModeInstance() -> OpMode:
# Try to instantiate with robot argument first
try:
@@ -55,7 +59,18 @@ class OpModeRobot(OpModeRobotBase):
except TypeError:
# Fallback to no-argument constructor
return opmodeCls() # type: ignore
if textColor is None or backgroundColor is None:
self.addOpModeFactory(makeOpModeInstance, mode, name, group or "", description or "")
self.addOpModeFactory(
makeOpModeInstance, mode, name, group or "", description or ""
)
else:
self.addOpModeFactory(makeOpModeInstance, mode, name, group or "", description or "", textColor, backgroundColor)
self.addOpModeFactory(
makeOpModeInstance,
mode,
name,
group or "",
description or "",
textColor,
backgroundColor,
)

View File

@@ -0,0 +1,163 @@
import contextlib
import time
import threading
import typing as T
import pytest
from .. import RobotBase
from ..simulation import (
DriverStationSim,
stepTiming,
stepTimingAsync,
getProgramStarted,
)
from hal._wpiHal import _RobotMode as RobotMode
class RobotTestController:
"""
Use this object to control the robot's state during tests
"""
def __init__(self, reraise, robot: RobotBase):
self._reraise = reraise
self._thread: T.Optional[threading.Thread] = None
self._robot = robot
self._cond = threading.Condition()
self._robot_started = False
self._robot_finished = False
def _robot_thread(self, robot: RobotBase):
with self._cond:
self._robot_started = True
self._cond.notify_all()
with self._reraise(catch=True):
assert robot is not None # shouldn't happen...
try:
robot.startCompetition()
assert self._robot_finished
finally:
# always call endCompetition or python hangs
robot.endCompetition()
del robot
@contextlib.contextmanager
def run_robot(self):
"""
Use this in a "with" statement to start your robot code at the
beginning of the with block, and end your robot code at the end
of the with block.
"""
# remove robot reference so it gets cleaned up when gc.collect() is called
robot = self._robot
assert robot is not None
self._robot = None
self._thread = th = threading.Thread(
target=self._robot_thread, args=(robot,), daemon=True
)
th.start()
with self._cond:
# make sure the thread didn't die
assert self._cond.wait_for(lambda: self._robot_started, timeout=1)
# This is the same thing that waitForProgramStart does
for _ in range(1000):
if getProgramStarted():
break
time.sleep(0.001)
else:
assert False, "robot never started"
try:
# in this block you should tell the sim to do sim things
yield
finally:
self._robot_finished = True
robot.endCompetition()
if isinstance(self._reraise.exception, RuntimeError):
if str(self._reraise.exception).startswith(
"HAL: A handle parameter was passed incorrectly"
):
msg = (
"Do not reuse HAL objects in tests! This error may occur if you"
" stored a motor/sensor as a global or as a class variable"
" outside of a method."
)
if hasattr(Exception, "add_note"):
self._reraise.exception.add_note(f"*** {msg}")
else:
e = self._reraise.exception
self._reraise.reset()
raise RuntimeError(msg) from e
# Increment time by 1 second to ensure that any notifiers fire
stepTimingAsync(1.0)
# the robot thread should exit quickly
th.join(timeout=1)
if th.is_alive():
pytest.fail("robot did not exit within 2 seconds")
self._robot = None
self._thread = None
@property
def robot_is_alive(self) -> bool:
"""
True if the robot code is alive
"""
th = self._thread
if not th:
return False
return th.is_alive()
def step_timing(
self,
*,
seconds: float,
autonomous: bool,
enabled: bool,
assert_alive: bool = True,
) -> float:
"""
This utility will increment simulated time, while pretending that
there's a driver station connected and delivering new packets
every 0.2 seconds.
:param seconds: Number of seconds to run (will step in increments of 0.2)
:param autonomous: Tell the robot that it is in autonomous mode
:param enabled: Tell the robot that it is enabled
:returns: Number of seconds time was incremented
"""
assert self.robot_is_alive, "did you call control.run_robot()?"
assert seconds > 0
DriverStationSim.setDsAttached(True)
DriverStationSim.setRobotMode(
RobotMode.AUTONOMOUS if autonomous else RobotMode.TELEOPERATED
)
DriverStationSim.setEnabled(enabled)
tm = 0.0
while tm < seconds + 0.01:
DriverStationSim.notifyNewData()
stepTiming(0.2)
if assert_alive:
assert self.robot_is_alive
tm += 0.2
return tm

View File

@@ -0,0 +1,461 @@
import dataclasses
import logging
import multiprocessing
import multiprocessing.connection
import os
import pathlib
import signal
import sys
import time
import typing as T
import pytest
import robotpy.main
import wpilib
from .pytest_plugin import RobotTestingPlugin
def _enable_faulthandler():
#
# In the event of a segfault, faulthandler will dump the currently
# active stack so you can figure out what went wrong.
#
# Additionally, on non-Windows platforms we register a SIGUSR2
# handler -- if you send the robot process a SIGUSR2, then
# faulthandler will dump all of your current stacks. This can
# be really useful for figuring out things like deadlocks.
#
import logging
logger = logging.getLogger("faulthandler")
try:
# These should work on all platforms
import faulthandler
faulthandler.enable()
except Exception as e:
logger.warning("Could not enable faulthandler: %s", e)
return
try:
faulthandler.register(signal.SIGUSR2)
logger.info("registered SIGUSR2 for PID %s", os.getpid())
except Exception:
return
class WorkerPlugin:
"""
This pytest plugin runs in the isolated process that runs a test that uses the
robot fixture.
Heavily borrowed from pytest-xdist WorkerInteractor
"""
def __init__(self, channel: multiprocessing.connection.Connection):
self.channel = channel
def sendevent(self, name: str, **kwargs: object):
self.channel.send((name, kwargs))
@pytest.hookimpl(wrapper=True)
def pytest_sessionstart(self, session: pytest.Session):
self.config = session.config
return (yield)
@pytest.hookimpl
def pytest_internalerror(self, excrepr: object):
formatted_error = str(excrepr)
for line in formatted_error.split("\n"):
print("IERROR>", line, file=sys.stderr)
self.sendevent("internal_error", formatted_error=formatted_error)
@pytest.hookimpl
def pytest_runtest_logstart(
self,
nodeid: str,
location: tuple[str, int | None, str],
):
self.sendevent("logstart", nodeid=nodeid, location=location)
@pytest.hookimpl
def pytest_runtest_logfinish(
self,
nodeid: str,
location: tuple[str, int | None, str],
):
self.sendevent("logfinish", nodeid=nodeid, location=location)
@pytest.hookimpl
def pytest_runtest_logreport(self, report: pytest.TestReport):
data = self.config.hook.pytest_report_to_serializable(
config=self.config, report=report
)
self.sendevent("testreport", data=data)
def _run_test(
item_nodeid, config_args, robot_class, robot_file, verbose, pipe, root_path
):
"""This function runs in a subprocess"""
logging.root.addHandler(logging.NullHandler())
logging.root.setLevel(logging.DEBUG if verbose else logging.INFO)
_enable_faulthandler()
# This is used by getDeployDirectory, so make sure it gets fixed
robotpy.main.robot_py_path = robot_file
os.chdir(root_path)
# keep the plugins around because it has a reference to the robot
# and we don't want it to die and deadlock
plugin = RobotTestingPlugin(robot_class, robot_file, True)
worker_plugin = WorkerPlugin(pipe)
ec = pytest.main(
[item_nodeid, "--no-header", "-p", "no:terminalreporter", *config_args],
plugins=[plugin, worker_plugin],
)
# ensure output is printed out
sys.stdout.flush()
# Don't let the process die, let the parent kill us to avoid
# python interpreter badness
worker_plugin.sendevent("finished", exit_code=ec)
pipe.close()
# ensure that the gc doesn't collect the plugin..
while plugin:
time.sleep(100)
@dataclasses.dataclass
class IsolatedTestJob:
item: pytest.Function
conn: multiprocessing.connection.Connection
process: multiprocessing.Process
start_time: float
exit_code: int | None = None
finished: bool = False
# set when the worker indicates it has finished
worker_completed: bool = False
def set_exit_code(self, ec: int):
if self.exit_code is None:
self.exit_code = ec
class IsolatedTestsPlugin:
"""
This pytest plugin runs any test that uses the 'robot' fixture in an
isolated subprocess
"""
def __init__(
self,
robot_class: T.Type[wpilib.RobotBase],
robot_file: pathlib.Path,
builtin_tests: bool,
verbose: bool,
parallelism: int,
):
self._robot_class = robot_class
self._robot_file = robot_file
self._builtin_tests = builtin_tests
self._verbose = verbose
if parallelism < 1:
try:
parallelism = multiprocessing.cpu_count() - 1
except NotImplementedError:
parallelism = 1
self._parallelism = max(1, parallelism)
self._shouldstop = False
@pytest.hookimpl(wrapper=True)
def pytest_sessionstart(self, session: pytest.Session):
self._config = session.config
self._maxfail: int = self._config.getvalue("maxfail")
self._countfailures = 0
self._shouldstop = False
multiprocessing.set_start_method("spawn")
return (yield)
@pytest.hookimpl
def pytest_runtestloop(self, session: pytest.Session) -> bool:
if (
session.testsfailed
and not session.config.option.continue_on_collection_errors
):
raise session.Interrupted(
f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection"
)
if session.config.option.collectonly:
return True
running: list[IsolatedTestJob] = []
deferred: list[pytest.Function] = []
try:
# Start any tests that use the robot fixture first. Tests that don't
# use the robot fixture will be ran later
for item in session.items:
assert isinstance(item, pytest.Function)
if "robot" not in item.fixturenames:
deferred.append(item)
continue
while len(running) >= self._parallelism:
self._wait_for_jobs(running, session)
running.append(self._start_isolated_test(item))
self._maybe_raise(session)
# Run the in-process tests now while the robot tests are finishing
for idx, item in enumerate(deferred):
nextitem = deferred[idx + 1] if idx + 1 < len(deferred) else None
session.config.hook.pytest_runtest_protocol(
item=item, nextitem=nextitem
)
self._maybe_raise(session)
while running:
self._wait_for_jobs(running, session)
finally:
for job in running:
self._cleanup_job(job)
return True
def _start_isolated_test(self, item: pytest.Function) -> IsolatedTestJob:
config_args = self._config.invocation_params.args
if self._builtin_tests:
nodeid = f"{config_args[0]}::{item.name}"
config_args = config_args[1:]
else:
nodeid = item.nodeid
pconn, cconn = multiprocessing.Pipe()
process = multiprocessing.Process(
target=_run_test,
args=(
nodeid,
config_args,
self._robot_class,
self._robot_file,
self._verbose,
cconn,
self._config.rootpath,
),
)
process.start()
cconn.close()
return IsolatedTestJob(
item=item,
conn=pconn,
process=process,
start_time=time.time(),
)
def _wait_for_jobs(self, running: list[IsolatedTestJob], session: pytest.Session):
if not running:
return
ready = multiprocessing.connection.wait([job.conn for job in running])
for conn in ready:
job = next(job for job in running if job.conn == conn)
self._process_job_messages(job, session)
if job.finished:
running.remove(job)
self._finalize_job(job, session)
def _process_job_messages(self, job: IsolatedTestJob, session: pytest.Session):
while not job.finished:
try:
if not job.conn.poll():
break
callname, kwargs = job.conn.recv()
except (IOError, EOFError) as e:
job.finished = True
break
method = "worker_" + callname
call = getattr(self, method)
call(job, **kwargs)
self._maybe_raise(session)
if not job.process.is_alive():
job.finished = True
def _finalize_job(self, job: IsolatedTestJob, session: pytest.Session):
self._cleanup_job(job)
if job.worker_completed:
return
stop = time.time()
duration = stop - job.start_time
ec = job.exit_code
longrepr = None
if ec is None:
longrepr = "subprocess failed for unknown reason"
else:
if ec < 0:
try:
signal_name = signal.strsignal(-ec)
longrepr = f"subprocess exited due to signal {-ec}: {signal_name}"
except ValueError:
pass
if longrepr is None:
longrepr = f"subprocess exited with exit code {ec}"
report = pytest.TestReport(
nodeid=job.item.nodeid,
location=job.item.location,
keywords=job.item.keywords,
outcome="failed",
longrepr=longrepr,
when="call",
duration=duration,
start=job.start_time,
stop=stop,
)
self._config.hook.pytest_runtest_logstart(
nodeid=job.item.nodeid, location=job.item.location
)
self._config.hook.pytest_runtest_logreport(report=report)
self._config.hook.pytest_runtest_logfinish(
nodeid=job.item.nodeid, location=job.item.location
)
self._maybe_raise(session)
def _cleanup_job(self, job: IsolatedTestJob):
try:
job.conn.close()
except Exception:
pass
if job.process.is_alive():
job.process.kill()
try:
job.process.join(timeout=1)
except TimeoutError:
pass
ec = job.process.exitcode
if ec is not None:
job.set_exit_code(ec)
job.process.close()
def _maybe_raise(self, session: pytest.Session):
if self._shouldstop:
raise session.Interrupted(self._shouldstop)
if session.shouldfail:
raise session.Failed(session.shouldfail)
if session.shouldstop:
raise session.Interrupted(session.shouldstop)
#
# Worker dispatch functions (copied from pytest-xdist)
#
def worker_logstart(
self,
job: IsolatedTestJob,
nodeid: str,
location: tuple[str, int | None, str],
):
"""Emitted when a node calls the pytest_runtest_logstart hook."""
if self._config.option.verbose > 0:
return
self._config.hook.pytest_runtest_logstart(nodeid=nodeid, location=location)
def worker_logfinish(
self,
job: IsolatedTestJob,
nodeid: str,
location: tuple[str, int | None, str],
):
"""Emitted when a node calls the pytest_runtest_logfinish hook."""
if self._config.option.verbose > 0:
return
self._config.hook.pytest_runtest_logfinish(nodeid=nodeid, location=location)
def worker_testreport(self, job: IsolatedTestJob, data: object):
"""Emitted when a node calls the pytest_runtest_logreport hook."""
report = self._config.hook.pytest_report_from_serializable(
config=self._config, data=data
)
self._config.hook.pytest_runtest_logreport(report=report)
self._handlefailures(report)
def worker_internal_error(self, job: IsolatedTestJob, formatted_error: str):
"""Emitted when a node calls the pytest_internalerror hook."""
for line in formatted_error.split("\n"):
print("IERROR>", line, file=sys.stderr)
job.finished = True
if not self._shouldstop:
self._shouldstop = "internal error in worker"
def worker_finished(self, job: IsolatedTestJob, exit_code: object | None = None):
"""Emitted when a node finishes running."""
if exit_code is not None:
job.exit_code = int(exit_code)
job.worker_completed = True
job.finished = True
def _handlefailures(self, rep: pytest.TestReport):
if rep.failed:
self._countfailures += 1
if (
self._maxfail
and self._countfailures >= self._maxfail
and not self._shouldstop
):
self._shouldstop = f"stopping after {self._countfailures} failures"
#
# These fixtures match the ones in RobotTestingPlugin but these have no effect
#
@pytest.fixture(scope="function")
def robot(self):
pass
@pytest.fixture(scope="function")
def control(self, reraise, robot):
pass
@pytest.fixture()
def robot_file(self) -> pathlib.Path:
"""The absolute filename your robot code is started from"""
return self._robot_file
@pytest.fixture()
def robot_path(self) -> pathlib.Path:
"""The absolute directory that your robot code is located at"""
return self._robot_file.parent

View File

@@ -0,0 +1,145 @@
import gc
import pathlib
import typing as T
import weakref
import pytest
import hal
import hal.simulation
from hal._wpiHal import _RobotMode as RobotMode
import ntcore
import wpilib
from wpilib.simulation import DriverStationSim, pauseTiming, restartTiming
import wpilib.simulation
# TODO: get rid of special-casing.. maybe should register a HAL shutdown hook or something
try:
import commands2
except ImportError:
commands2 = None
from .controller import RobotTestController
class RobotTestingPlugin:
"""
Pytest plugin. Each documented member function name can be an argument
to your test functions, and the data that these functions return will
be passed to your test function.
"""
def __init__(
self,
robot_class: T.Type[wpilib.RobotBase],
robot_file: pathlib.Path,
isolated: bool,
):
self.isolated = isolated
self._robot_file = robot_file
self._robot_class = robot_class
#
# Fixtures
#
# Each one of these can be arguments to your test, and the result of the
# corresponding function will be passed to your test as that argument.
#
@pytest.fixture(scope="function")
def robot(self):
"""
Your robot instance
.. note:: RobotPy/WPILib testing infrastructure is really sensitive
to ensuring that things get cleaned up properly. Make sure
that you don't store references to your robot or other
WPILib objects in a global or static context.
"""
#
# This function needs to do the same things that RobotBase.main does
# plus some extra things needed for testing
#
# Previously this was separate from robot fixture, but we need to
# ensure that the robot cleanup happens deterministically relative to
# when handle cleanup/etc happens, otherwise unnecessary HAL errors will
# bubble up to the user
#
nt_inst = ntcore.NetworkTableInstance.getDefault()
nt_inst.startLocal()
pauseTiming()
restartTiming()
wpilib.DriverStation.silenceJoystickConnectionWarning(True)
DriverStationSim.setRobotMode(RobotMode.AUTONOMOUS)
DriverStationSim.setEnabled(False)
DriverStationSim.notifyNewData()
# Create the user's robot instance
robot = self._robot_class()
# Tests only get a proxy to ensure cleanup is more reliable
yield weakref.proxy(robot)
# If running in separate processes, no need to do cleanup
if self.isolated:
# .. and funny enough, in isolated mode we *don't* want the
# robot to be cleaned up, as that can deadlock
self._saved_robot = robot
return
# HACK: avoid motor safety deadlock
wpilib.simulation._simulation._resetMotorSafety()
del robot
if commands2 is not None:
commands2.CommandScheduler.resetInstance()
# Double-check all objects are destroyed so that HAL handles are released
gc.collect()
# shutdown networktables before other kinds of global cleanup
# -> some reset functions will re-register listeners, so it's important
# to do this before so that the listeners are active on the current
# NetworkTables instance
nt_inst.stopLocal()
nt_inst._reset()
# Cleanup WPILib globals
# -> preferences, SmartDashboard, MotorSafety
wpilib.simulation._simulation._resetWpilibSimulationData()
wpilib._wpilib._clearSmartDashboardData()
# Cancel all periodic callbacks
hal.simulation.cancelAllSimPeriodicCallbacks()
# Reset the HAL handles
hal.simulation.resetGlobalHandles()
# Reset the HAL data
hal.simulation.resetAllSimData()
# Don't call HAL shutdown! This is only used to cleanup HAL extensions,
# and functions will only be called the first time (unless re-registered)
# hal.shutdown()
@pytest.fixture(scope="function")
def control(self, reraise, robot: wpilib.RobotBase) -> RobotTestController:
"""
A pytest fixture that provides control over your robot
"""
return RobotTestController(reraise, robot)
@pytest.fixture()
def robot_file(self) -> pathlib.Path:
"""The absolute filename your robot code is started from"""
return self._robot_file
@pytest.fixture()
def robot_path(self) -> pathlib.Path:
"""The absolute directory that your robot code is located at"""
return self._robot_file.parent

View File

@@ -0,0 +1,74 @@
"""
The primary purpose of these tests is to run through your code
and make sure that it doesn't crash. If you actually want to test
your code, you need to write your own custom tests to tease out
the edge cases.
To use these, add the following to a python file in your tests directory::
from wpilib.testing.robot_tests import *
"""
import pytest
from .controller import RobotTestController
def test_autonomous(control: RobotTestController):
"""Runs autonomous mode by itself"""
with control.run_robot():
# Run disabled for a short period
control.step_timing(seconds=0.5, autonomous=True, enabled=False)
# Run enabled for 15 seconds
control.step_timing(seconds=15, autonomous=True, enabled=True)
# Disabled for another short period
control.step_timing(seconds=0.5, autonomous=True, enabled=False)
@pytest.mark.filterwarnings("ignore")
def test_disabled(control: RobotTestController, robot):
"""Runs disabled mode by itself"""
with control.run_robot():
# Run disabled + autonomous for a short period
control.step_timing(seconds=5, autonomous=True, enabled=False)
# Run disabled + !autonomous for a short period
control.step_timing(seconds=5, autonomous=False, enabled=False)
@pytest.mark.filterwarnings("ignore")
def test_operator_control(control: RobotTestController):
"""Runs operator control mode by itself"""
with control.run_robot():
# Run disabled for a short period
control.step_timing(seconds=0.5, autonomous=False, enabled=False)
# Run enabled for 15 seconds
control.step_timing(seconds=15, autonomous=False, enabled=True)
# Disabled for another short period
control.step_timing(seconds=0.5, autonomous=False, enabled=False)
@pytest.mark.filterwarnings("ignore")
def test_practice(control: RobotTestController):
"""Runs through the entire span of a practice match"""
with control.run_robot():
# Run disabled for a short period
control.step_timing(seconds=0.5, autonomous=True, enabled=False)
# Run autonomous + enabled for 15 seconds
control.step_timing(seconds=15, autonomous=True, enabled=True)
# Disabled for another short period
control.step_timing(seconds=0.5, autonomous=False, enabled=False)
# Run teleop + enabled for 2 minutes
control.step_timing(seconds=120, autonomous=False, enabled=True)