[bazel][robotpy] Add mirror for robotpy's wpiuil and wpinet libraries (#8062)

Project import generated by Copybara.

GitOrigin-RevId: 92ea93d1b47a82667044bd0af05f7fdb34d2c2c2
This commit is contained in:
PJ Reiniger
2025-08-30 14:55:11 -04:00
committed by GitHub
parent 96004f9bb5
commit bd1dcc4358
96 changed files with 7271 additions and 64 deletions

View File

@@ -20,6 +20,9 @@ modifiableFileExclude {
generatedFileExclude {
src/main/native/thirdparty/
src/main/python/
src/test/python/
src/main/native/include/wpi/fs\.h$
src/main/native/include/wpi/FastQueue\.h$
src/main/native/cpp/fs\.cpp$

View File

@@ -2,13 +2,18 @@ load("@allwpilib_pip_deps//:requirements.bzl", "requirement")
load("@aspect_bazel_lib//lib:write_source_files.bzl", "write_source_files")
load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library", "cc_test")
load("@rules_java//java:defs.bzl", "java_binary")
load("@rules_python//python:defs.bzl", "py_binary")
load("@rules_python//python:defs.bzl", "py_binary", "py_library")
load("//shared/bazel/rules:cc_rules.bzl", "third_party_cc_lib_helper", "wpilib_cc_library", "wpilib_cc_shared_library", "wpilib_cc_static_library")
load("//shared/bazel/rules:java_rules.bzl", "wpilib_java_junit5_test")
load("//shared/bazel/rules:jni_rules.bzl", "wpilib_jni_cc_library", "wpilib_jni_java_library")
load("//shared/bazel/rules:packaging.bzl", "package_default_jni_project")
load("//shared/bazel/rules/gen:gen-resources.bzl", "generate_resources")
load("//shared/bazel/rules/robotpy:build_info_gen.bzl", "generate_robotpy_native_wrapper_build_info", "generate_robotpy_pybind_build_info")
load("//shared/bazel/rules/robotpy:pybind_rules.bzl", "create_pybind_library")
load("//shared/bazel/rules/robotpy:pytest_util.bzl", "robotpy_py_test")
load("//wpiutil:generate.bzl", "generate_wpiutil")
load("//wpiutil:robotpy_native_build_info.bzl", "define_native_wrapper")
load("//wpiutil:robotpy_pybind_build_info.bzl", "define_pybind_library", "publish_library_casters", "wpiutil_extension")
filegroup(
name = "doxygen-files",
@@ -315,6 +320,95 @@ java_binary(
],
)
generate_robotpy_native_wrapper_build_info(
name = "robotpy-native-wpiutil-generator",
pyproject_toml = "src/main/python/native-pyproject.toml",
third_party_dirs = [
"argparse",
"debugging",
"expected",
"fmtlib",
"json",
"llvm",
"mpack",
"nanopb",
"sigslot",
"upb",
],
)
define_native_wrapper(
name = "robotpy-native-wpiutil",
)
PYBIND_PKGCFG_DEPS = ["//wpiutil:native/wpiutil/robotpy-native-wpiutil.pc"]
generate_robotpy_pybind_build_info(
name = "robotpy-wpiutil-generator",
additional_srcs = ["src/main/python/wpiutil/src/wpistruct/wpystruct_fns.h"] + [":robotpy-native-wpiutil.copy_headers"],
package_root_file = "src/main/python/wpiutil/__init__.py",
pkgcfgs = PYBIND_PKGCFG_DEPS,
pyproject_toml = "src/main/python/pyproject.toml",
yaml_files = glob(["src/main/python/semiwrap/*.yml"]),
)
publish_library_casters()
wpiutil_extension(
srcs = glob(["src/main/python/wpiutil/src/**/*.cpp"]),
extra_hdrs = glob([
"src/main/python/wpiutil/src/type_casters/*.h",
"src/main/python/wpiutil/src/wpistruct/*.h",
]),
header_to_dat_deps = ["src/main/python/wpiutil/src/wpistruct/wpystruct_fns.h"],
includes = [
"src/main/python/wpiutil/",
"src/main/python/wpiutil/src/type_casters",
"src/main/python/wpiutil/src/wpistruct",
],
)
define_pybind_library(
name = "robotpy-wpiutil",
pkgcfgs = PYBIND_PKGCFG_DEPS,
)
create_pybind_library(
name = "module",
dynamic_deps = [
":shared/wpiutil",
],
extension_name = "module",
extra_srcs = glob(["src/test/python/cpp/wpiutil_test/*.cpp"]),
install_path = "src/test/python/cpp/wpiutil_test/",
deps = [
":wpiutil_pybind_library",
],
)
py_library(
name = "wpiutil_test",
srcs = glob(["src/test/python/cpp/wpiutil_test/*.py"]),
data = [
":src/test/python/cpp/wpiutil_test/module",
],
imports = ["src/test/python/cpp"],
visibility = ["//visibility:public"],
)
robotpy_py_test(
"wpiutil_tests",
srcs = glob(
["src/test/python/**/*.py"],
exclude = ["src/test/python/cpp/**"],
),
deps = [
":robotpy-wpiutil",
":wpiutil_test",
requirement("pytest"),
],
)
package_default_jni_project(
name = "wpiutil",
maven_artifact_name = "wpiutil-cpp",

53
wpiutil/robotpy_native_build_info.bzl generated Normal file
View File

@@ -0,0 +1,53 @@
# THIS FILE IS AUTO GENERATED
load("@aspect_bazel_lib//lib:copy_to_directory.bzl", "copy_to_directory")
load("//shared/bazel/rules/robotpy:pybind_rules.bzl", "native_wrappery_library")
def define_native_wrapper(name, pyproject_toml = None):
copy_to_directory(
name = "{}.copy_headers".format(name),
srcs = native.glob(["src/main/native/include/**"]) + native.glob(["src/generated/main/native/include/**"], allow_empty = True) + native.glob([
"src/main/native/thirdparty/argparse/include/**",
"src/main/native/thirdparty/debugging/include/**",
"src/main/native/thirdparty/expected/include/**",
"src/main/native/thirdparty/fmtlib/include/**",
"src/main/native/thirdparty/json/include/**",
"src/main/native/thirdparty/llvm/include/**",
"src/main/native/thirdparty/mpack/include/**",
"src/main/native/thirdparty/nanopb/include/**",
"src/main/native/thirdparty/sigslot/include/**",
"src/main/native/thirdparty/upb/include/**",
]),
out = "native/wpiutil/include",
root_paths = ["src/main/native/include/"],
replace_prefixes = {
"wpiutil/src/generated/main/native/include": "",
"wpiutil/src/main/native/include": "",
"wpiutil/src/main/native/thirdparty/argparse/include": "",
"wpiutil/src/main/native/thirdparty/debugging/include": "",
"wpiutil/src/main/native/thirdparty/expected/include": "",
"wpiutil/src/main/native/thirdparty/fmtlib/include": "",
"wpiutil/src/main/native/thirdparty/json/include": "",
"wpiutil/src/main/native/thirdparty/llvm/include": "",
"wpiutil/src/main/native/thirdparty/mpack/include": "",
"wpiutil/src/main/native/thirdparty/nanopb/include": "",
"wpiutil/src/main/native/thirdparty/sigslot/include": "",
"wpiutil/src/main/native/thirdparty/upb/include": "",
},
verbose = False,
visibility = ["//visibility:public"],
)
native_wrappery_library(
name = name,
pyproject_toml = pyproject_toml or "src/main/python/native-pyproject.toml",
libinit_file = "native/wpiutil/_init_robotpy_native_wpiutil.py",
pc_file = "native/wpiutil/robotpy-native-wpiutil.pc",
pc_deps = [
],
deps = [
],
headers = "{}.copy_headers".format(name),
native_shared_library = "shared/wpiutil",
install_path = "native/wpiutil/",
)

234
wpiutil/robotpy_pybind_build_info.bzl generated Normal file
View File

@@ -0,0 +1,234 @@
# THIS FILE IS AUTO GENERATED
load("@rules_cc//cc:cc_library.bzl", "cc_library")
load("//shared/bazel/rules/robotpy:pybind_rules.bzl", "create_pybind_library", "robotpy_library")
load("//shared/bazel/rules/robotpy:semiwrap_helpers.bzl", "gen_libinit", "gen_modinit_hpp", "gen_pkgconf", "publish_casters", "resolve_casters", "run_header_gen")
load("//shared/bazel/rules/robotpy:semiwrap_tool_helpers.bzl", "scan_headers", "update_yaml_files")
def wpiutil_extension(srcs = [], header_to_dat_deps = [], extra_hdrs = [], includes = [], extra_pyi_deps = []):
WPIUTIL_HEADER_GEN = [
struct(
class_name = "StackTrace",
yml_file = "semiwrap/StackTrace.yml",
header_root = "$(execpath :robotpy-native-wpiutil.copy_headers)",
header_file = "$(execpath :robotpy-native-wpiutil.copy_headers)/wpi/StackTrace.h",
tmpl_class_names = [],
trampolines = [],
),
struct(
class_name = "Synchronization",
yml_file = "semiwrap/Synchronization.yml",
header_root = "$(execpath :robotpy-native-wpiutil.copy_headers)",
header_file = "$(execpath :robotpy-native-wpiutil.copy_headers)/wpi/Synchronization.h",
tmpl_class_names = [],
trampolines = [],
),
struct(
class_name = "RawFrame",
yml_file = "semiwrap/RawFrame.yml",
header_root = "$(execpath :robotpy-native-wpiutil.copy_headers)",
header_file = "$(execpath :robotpy-native-wpiutil.copy_headers)/wpi/RawFrame.h",
tmpl_class_names = [],
trampolines = [],
),
struct(
class_name = "Sendable",
yml_file = "semiwrap/Sendable.yml",
header_root = "$(execpath :robotpy-native-wpiutil.copy_headers)",
header_file = "$(execpath :robotpy-native-wpiutil.copy_headers)/wpi/sendable/Sendable.h",
tmpl_class_names = [],
trampolines = [
("wpi::Sendable", "wpi__Sendable.hpp"),
],
),
struct(
class_name = "SendableBuilder",
yml_file = "semiwrap/SendableBuilder.yml",
header_root = "$(execpath :robotpy-native-wpiutil.copy_headers)",
header_file = "$(execpath :robotpy-native-wpiutil.copy_headers)/wpi/sendable/SendableBuilder.h",
tmpl_class_names = [],
trampolines = [
("wpi::SendableBuilder", "wpi__SendableBuilder.hpp"),
],
),
struct(
class_name = "SendableRegistry",
yml_file = "semiwrap/SendableRegistry.yml",
header_root = "$(execpath :robotpy-native-wpiutil.copy_headers)",
header_file = "$(execpath :robotpy-native-wpiutil.copy_headers)/wpi/sendable/SendableRegistry.h",
tmpl_class_names = [],
trampolines = [
("wpi::SendableRegistry", "wpi__SendableRegistry.hpp"),
],
),
struct(
class_name = "WPyStruct",
yml_file = "semiwrap/WPyStruct.yml",
header_root = "wpiutil/src/main/python/wpiutil",
header_file = "wpiutil/src/main/python/wpiutil/src/wpistruct/wpystruct_fns.h",
tmpl_class_names = [],
trampolines = [],
),
]
resolve_casters(
name = "wpiutil.resolve_casters",
caster_deps = [":src/main/python/wpiutil/wpiutil-casters.pybind11.json"],
casters_pkl_file = "wpiutil.casters.pkl",
dep_file = "wpiutil.casters.d",
)
gen_libinit(
name = "wpiutil.gen_lib_init",
output_file = "src/main/python/wpiutil/_init__wpiutil.py",
modules = ["native.wpiutil._init_robotpy_native_wpiutil"],
)
gen_pkgconf(
name = "wpiutil.gen_pkgconf",
libinit_py = "wpiutil._init__wpiutil",
module_pkg_name = "wpiutil._wpiutil",
output_file = "wpiutil.pc",
pkg_name = "wpiutil",
install_path = "src/main/python/wpiutil",
project_file = "src/main/python/pyproject.toml",
package_root = "src/main/python/wpiutil/__init__.py",
)
gen_modinit_hpp(
name = "wpiutil.gen_modinit_hpp",
input_dats = [x.class_name for x in WPIUTIL_HEADER_GEN],
libname = "_wpiutil",
output_file = "semiwrap_init.wpiutil._wpiutil.hpp",
)
run_header_gen(
name = "wpiutil",
casters_pickle = "wpiutil.casters.pkl",
header_gen_config = WPIUTIL_HEADER_GEN,
trampoline_subpath = "src/main/python/wpiutil",
deps = header_to_dat_deps,
local_native_libraries = [
"//wpiutil:robotpy-native-wpiutil.copy_headers",
],
)
create_pybind_library(
name = "wpiutil",
install_path = "src/main/python/wpiutil/",
extension_name = "_wpiutil",
generated_srcs = [":wpiutil.generated_srcs"],
semiwrap_header = [":wpiutil.gen_modinit_hpp"],
deps = [
":wpiutil.tmpl_hdrs",
":wpiutil.trampoline_hdrs",
"//wpiutil:wpiutil",
"//wpiutil:wpiutil-casters",
],
dynamic_deps = [
"//wpiutil:shared/wpiutil",
],
extra_hdrs = extra_hdrs,
extra_srcs = srcs,
includes = includes,
)
native.filegroup(
name = "wpiutil.generated_files",
srcs = [
"wpiutil.gen_modinit_hpp.gen",
"wpiutil.header_gen_files",
"wpiutil.gen_pkgconf",
"wpiutil.gen_lib_init",
],
tags = ["manual", "robotpy"],
)
def publish_library_casters():
publish_casters(
name = "publish_casters",
caster_name = "wpiutil-casters",
output_json = "src/main/python/wpiutil/wpiutil-casters.pybind11.json",
output_pc = "src/main/python/wpiutil/wpiutil-casters.pc",
project_config = "src/main/python/pyproject.toml",
package_root = "src/main/python/wpiutil/__init__.py",
typecasters_srcs = native.glob(["src/main/python/wpiutil/src/type_casters/**", "src/main/python/wpiutil/src/wpistruct/**"]),
)
cc_library(
name = "wpiutil-casters",
hdrs = native.glob(["src/main/python/wpiutil/src/type_casters/*.h", "src/main/python/wpiutil/src/wpistruct/*.h"]),
includes = ["src/main/python/wpiutil/src/type_casters", "src/main/python/wpiutil/src/wpistruct"],
visibility = ["//visibility:public"],
tags = ["robotpy"],
)
def define_pybind_library(name, pkgcfgs = []):
# Helper used to generate all files with one target.
native.filegroup(
name = "{}.generated_files".format(name),
srcs = [
"wpiutil.generated_files",
],
tags = ["manual", "robotpy"],
visibility = ["//visibility:public"],
)
# Files that will be included in the wheel as data deps
native.filegroup(
name = "{}.generated_pkgcfg_files".format(name),
srcs = [
"src/main/python/wpiutil/wpiutil.pc",
"src/main/python/wpiutil/wpiutil-casters.pc",
"src/main/python/wpiutil/wpiutil-casters.pybind11.json",
],
tags = ["manual", "robotpy"],
visibility = ["//visibility:public"],
)
# Contains all of the non-python files that need to be included in the wheel
native.filegroup(
name = "{}.extra_files".format(name),
srcs = native.glob(["src/main/python/wpiutil/**"], exclude = ["src/main/python/wpiutil/**/*.py"], allow_empty = True),
tags = ["manual", "robotpy"],
)
robotpy_library(
name = name,
srcs = native.glob(["src/main/python/wpiutil/**/*.py"]) + [
"src/main/python/wpiutil/_init__wpiutil.py",
],
data = [
"{}.generated_pkgcfg_files".format(name),
"{}.extra_files".format(name),
":src/main/python/wpiutil/_wpiutil",
":wpiutil.trampoline_hdr_files",
],
imports = ["src/main/python"],
deps = [
"//wpiutil:robotpy-native-wpiutil",
],
visibility = ["//visibility:public"],
)
update_yaml_files(
name = "{}-update-yaml".format(name),
yaml_output_directory = "src/main/python/semiwrap",
extra_hdrs = native.glob(["src/main/python/**/*.h"], allow_empty = True) + [
"//wpiutil:robotpy-native-wpiutil.copy_headers",
],
package_root_file = "src/main/python/wpiutil/__init__.py",
pkgcfgs = pkgcfgs,
pyproject_toml = "src/main/python/pyproject.toml",
yaml_files = native.glob(["src/main/python/semiwrap/**"]),
)
scan_headers(
name = "{}-scan-headers".format(name),
extra_hdrs = native.glob(["src/main/python/**/*.h"], allow_empty = True) + [
"//wpiutil:robotpy-native-wpiutil.copy_headers",
],
package_root_file = "src/main/python/wpiutil/__init__.py",
pkgcfgs = pkgcfgs,
pyproject_toml = "src/main/python/pyproject.toml",
)

View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
#
# Copyright (c) FIRST and other WPILib contributors.
# Open Source Software; you can modify and/or share it under the terms of
# the WPILib BSD license file in the root directory of this project.
import argparse
import datetime
from wpiutil.log import DataLogReader
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("infile")
args = parser.parse_args()
reader = DataLogReader(args.infile)
entries = {}
for record in reader:
timestamp = record.getTimestamp() / 1000000
if record.isStart():
try:
data = record.getStartData()
print(f"{data} [{timestamp}]")
if data.entry in entries:
print("...DUPLICATE entry ID, overriding")
entries[data.entry] = data
except TypeError as e:
print("Start(INVALID)")
elif record.isFinish():
try:
entry = record.getFinishEntry()
print(f"Finish({entry}) [{timestamp}]")
if entry not in entries:
print("...ID not found")
else:
del entries[entry]
except TypeError as e:
print("Finish(INVALID)")
elif record.isSetMetadata():
try:
data = record.getSetMetadataData()
print(f"{data} [{timestamp}]")
if data.entry not in entries:
print("...ID not found")
except TypeError as e:
print("SetMetadata(INVALID)")
elif record.isControl():
print("Unrecognized control record")
else:
print(f"Data({record.getEntry()}, size={record.getSize()}) ", end="")
entry = entries.get(record.getEntry(), None)
if entry is None:
print("<ID not found>")
continue
print(f"<name='{entry.name}', type='{entry.type}'> [{timestamp}]")
try:
# handle systemTime specially
if entry.name == "systemTime" and entry.type == "int64":
dt = datetime.fromtimestamp(record.getInteger() / 1000000)
print(" {:%Y-%m-%d %H:%M:%S.%f}".format(dt))
continue
if entry.type == "double":
print(f" {record.getDouble()}")
elif entry.type == "int64":
print(f" {record.getInteger()}")
elif entry.type == "string" or entry.type == "json":
print(f" '{record.getString()}'")
elif entry.type == "boolean":
print(f" {record.getBoolean()}")
elif entry.type == "boolean[]":
arr = record.getBooleanArray()
print(f" {arr}")
elif entry.type == "double[]":
arr = record.getDoubleArray()
print(f" {arr}")
elif entry.type == "float[]":
arr = record.getFloatArray()
print(f" {arr}")
elif entry.type == "int64[]":
arr = record.getIntegerArray()
print(f" {arr}")
elif entry.type == "string[]":
arr = record.getStringArray()
print(f" {arr}")
elif entry.type == "raw":
print(f" {record.getRaw()}")
except TypeError as e:
print(" invalid", e)

View File

@@ -0,0 +1,29 @@
#!/usr/bin/env python3
import argparse
import pathlib
from wpiutil.log import DataLog, BooleanLogEntry, StringArrayLogEntry, RawLogEntry
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("out", type=pathlib.Path)
args = parser.parse_args()
if args.out.is_dir():
datalog = DataLog(str(args.out))
else:
datalog = DataLog(str(args.out.parent), args.out.name)
bools = BooleanLogEntry(datalog, "/bools")
bools.append(True)
bools.append(False)
strings = StringArrayLogEntry(datalog, "/strings")
strings.append(["a", "b", "c"])
strings.append(["d", "e", "f"])
raw = RawLogEntry(datalog, "/raws")
raw.append(b"\x01\x02\x03")
raw.append(b"\x04\x05\x06")

View File

@@ -0,0 +1,52 @@
[build-system]
build-backend = "hatchling.build"
requires = [
"hatchling",
"hatch-nativelib~=0.2.0",
"hatch-robotpy~=0.2.1",
]
[project]
name = "robotpy-native-wpiutil"
version = "2027.0.0a2"
description = "WPILib Utility Library"
license = "BSD-3-Clause"
dependencies = [
"msvc-runtime>=14.42.34433; platform_system == 'Windows'"
]
[tool.hatch.build.targets.wheel]
packages = ["src/native"]
[[tool.hatch.build.hooks.robotpy.maven_lib_download]]
artifact_id = "wpiutil-cpp"
group_id = "edu.wpi.first.wpiutil"
repo_url = "https://frcmaven.wpi.edu/artifactory/release-2027"
version = "2027.0.0-alpha-2"
extract_to = "src/native/wpiutil"
libs = ["wpiutil"]
[[tool.hatch.build.hooks.nativelib.pcfile]]
pcfile = "src/native/wpiutil/robotpy-native-wpiutil.pc"
name = "wpiutil"
includedir = "src/native/wpiutil/include"
libdir = "src/native/wpiutil/lib"
shared_libraries = ["wpiutil"]
enable_if = "platform_system != 'Windows'"
[[tool.hatch.build.hooks.nativelib.pcfile]]
pcfile = "src/native/wpiutil/robotpy-native-wpiutil.pc"
name = "wpiutil"
includedir = "src/native/wpiutil/include"
libdir = "src/native/wpiutil/lib"
shared_libraries = ["wpiutil"]
# All wpilib projects require this flag
extra_cflags = "/Zc:preprocessor"
enable_if = "platform_system == 'Windows'"

View File

@@ -0,0 +1,121 @@
[build-system]
build-backend = "hatchling.build"
requires = [
"semiwrap~=0.1.7",
"hatch-meson~=0.1.0b2",
"hatch-robotpy~=0.2.1",
"hatchling",
"robotpy-native-wpiutil==2027.0.0a2",
]
[project]
name = "robotpy-wpiutil"
version = "2027.0.0a2"
description = "Binary wrapper for FRC WPIUtil library"
authors = [
{name = "RobotPy Development Team", email = "robotpy@googlegroups.com"},
]
license = "BSD-3-Clause"
dependencies = [
"robotpy-native-wpiutil==2027.0.0a2",
]
[project.urls]
"Source code" = "https://github.com/robotpy/mostrobotpy"
[tool.hatch.build.hooks.robotpy]
version_file = "wpiutil/version.py"
[tool.hatch.build.hooks.semiwrap]
[tool.hatch.build.hooks.meson]
[tool.hatch.build.targets.wheel]
packages = ["wpiutil"]
[tool.semiwrap]
update_init = [
"wpiutil",
# "wpiutil.log wpiutil._wpiutil.log",
"wpiutil.sync wpiutil._wpiutil.sync",
"wpiutil.wpistruct wpiutil._wpiutil.wpistruct",
]
scan_headers_ignore = [
"debugging.hpp",
"debugging/*",
"fmt/*",
"google/*",
"wpi/*",
"wpystruct_fns.h",
"pb.h",
"pb_common.h",
"pb_decode.h",
"pb_encode.h",
]
[tool.semiwrap.extension_modules."wpiutil._wpiutil"]
name = "wpiutil"
includes = [
"wpiutil/src/wpistruct",
]
wraps = ["robotpy-native-wpiutil"]
depends = ["wpiutil-casters"]
[tool.semiwrap.extension_modules."wpiutil._wpiutil".headers]
# wpi
StackTrace = "wpi/StackTrace.h"
Synchronization = "wpi/Synchronization.h"
RawFrame = "wpi/RawFrame.h"
# wpi/sendable
Sendable = "wpi/sendable/Sendable.h"
SendableBuilder = "wpi/sendable/SendableBuilder.h"
#SendableHelper = "wpi/sendable/SendableHelper.h"
SendableRegistry = "wpi/sendable/SendableRegistry.h"
WPyStruct = "src/wpistruct/wpystruct_fns.h"
[tool.semiwrap.export_type_casters.wpiutil-casters]
pypackage = "wpiutil"
includedir = [
"wpiutil/src/type_casters",
"wpiutil/src/wpistruct",
]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpi_array_type_caster.h"
types = ["wpi::array"]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpi_json_type_caster.h"
types = ["wpi::json"]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpi_span_type_caster.h"
types = ["std::span"]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpi_smallset_type_caster.h"
types = ["wpi::SmallSet"]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpi_smallvector_type_caster.h"
types = ["wpi::SmallVector"]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpi_smallvectorimpl_type_caster.h"
types = ["wpi::SmallVectorImpl"]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpi_string_map_caster.h"
types = ["wpi::StringMap"]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpi_ct_string_type_caster.h"
types = ["wpi::ct_string"]
[[tool.semiwrap.export_type_casters.wpiutil-casters.headers]]
header = "wpystruct.h"
types = ["WPyStruct"]

View File

@@ -0,0 +1,7 @@
defaults:
ignore: true
enums:
WPI_TimestampSource:
value_prefix: WPI_TIMESRC
rename: TimestampSource

View File

@@ -0,0 +1,12 @@
extra_includes:
- wpi/sendable/SendableBuilder.h
classes:
wpi::Sendable:
methods:
InitSendable:
virtual_xform: |-
[&](py::function fn) {
auto builderHandle = py::cast(builder, py::return_value_policy::reference);
fn(builderHandle);
}

View File

@@ -0,0 +1,40 @@
classes:
wpi::SendableBuilder:
enums:
BackendKind:
methods:
SetSmartDashboardType:
SetActuator:
AddBooleanProperty:
PublishConstBoolean:
AddIntegerProperty:
PublishConstInteger:
AddFloatProperty:
PublishConstFloat:
AddDoubleProperty:
PublishConstDouble:
AddStringProperty:
PublishConstString:
AddBooleanArrayProperty:
PublishConstBooleanArray:
AddIntegerArrayProperty:
PublishConstIntegerArray:
AddFloatArrayProperty:
PublishConstFloatArray:
AddDoubleArrayProperty:
PublishConstDoubleArray:
AddStringArrayProperty:
PublishConstStringArray:
AddRawProperty:
PublishConstRaw:
AddSmallStringProperty:
AddSmallBooleanArrayProperty:
AddSmallIntegerArrayProperty:
AddSmallFloatArrayProperty:
AddSmallDoubleArrayProperty:
AddSmallStringArrayProperty:
AddSmallRawProperty:
GetBackendKind:
IsPublished:
Update:
ClearProperties:

View File

@@ -0,0 +1,54 @@
extra_includes:
- wpi/sendable/Sendable.h
- wpi/sendable/SendableBuilder.h
classes:
wpi::SendableRegistry:
nodelete: true
methods:
Add:
overloads:
Sendable*, std::string_view:
keepalive:
- [1, 2]
Sendable*, std::string_view, int:
keepalive:
- [1, 2]
Sendable*, std::string_view, int, int:
keepalive:
- [1, 2]
Sendable*, std::string_view, std::string_view:
keepalive:
- [1, 2]
AddChild:
overloads:
Sendable*, Sendable*:
keepalive:
- [1, 2]
- [2, 3]
Sendable*, void*:
ignore: true
Remove:
Move:
ignore: true
Contains:
GetName:
SetName:
overloads:
Sendable*, std::string_view:
Sendable*, std::string_view, int:
Sendable*, std::string_view, int, int:
Sendable*, std::string_view, std::string_view:
GetSubsystem:
SetSubsystem:
GetDataHandle:
ignore: true
SetData:
ignore: true
GetData:
ignore: true
GetUniqueId:
GetSendable:
Publish:
Update:
EnsureInitialized:

View File

@@ -0,0 +1,5 @@
functions:
GetStackTrace:
GetStackTraceDefault:
SetGetStackTraceImpl:
ignore: true

View File

@@ -0,0 +1,59 @@
defaults:
ignore: true
subpackage: sync
extra_includes:
- pybind11/stl.h
functions:
CreateEvent:
DestroyEvent:
SetEvent:
ResetEvent:
CreateSemaphore:
DestroySemaphore:
ReleaseSemaphore:
param_override:
prevCount:
default: "0"
WaitForObject:
overloads:
WPI_Handle:
WPI_Handle, double, bool*:
WaitForObjects:
overloads:
std::span<const WPI_Handle>, std::span<WPI_Handle>:
param_override:
signaled:
ignore: true
cpp_code: |
[](std::span<const WPI_Handle> handles) {
py::gil_scoped_release release;
std::vector<WPI_Handle> signaled(handles.size());
auto result = wpi::WaitForObjects(handles, signaled);
signaled.resize(result.size());
return signaled;
}
std::initializer_list<WPI_Handle>, std::span<WPI_Handle>:
ignore: true
std::span<const WPI_Handle>, std::span<WPI_Handle>, double, bool*:
param_override:
signaled:
ignore: true
timedOut:
ignore: true
cpp_code: |
[](std::span<const WPI_Handle> handles, double timeout) {
py::gil_scoped_release release;
std::vector<WPI_Handle> signaled(handles.size());
bool timedOut = false;
auto result = wpi::WaitForObjects(handles, signaled, timeout, &timedOut);
signaled.resize(result.size());
return std::make_tuple(signaled, timedOut);
}
std::initializer_list<WPI_Handle>, std::span<WPI_Handle>, double, bool*:
ignore: true
CreateSignalObject:
SetSignalObject:
ResetSignalObject:
DestroySignalObject:

View File

@@ -0,0 +1,22 @@
defaults:
subpackage: wpistruct
functions:
forEachNested:
no_release_gil: true
getTypeName:
no_release_gil: true
getSchema:
no_release_gil: true
getSize:
no_release_gil: true
pack:
no_release_gil: true
packArray:
no_release_gil: true
packInto:
no_release_gil: true
unpack:
no_release_gil: true
unpackArray:
no_release_gil: true

View File

@@ -0,0 +1,28 @@
from . import _init__wpiutil
# autogenerated by 'semiwrap create-imports wpiutil wpiutil._wpiutil'
from ._wpiutil import (
Sendable,
SendableBuilder,
SendableRegistry,
TimestampSource,
getStackTrace,
getStackTraceDefault,
)
__all__ = [
"Sendable",
"SendableBuilder",
"SendableRegistry",
"TimestampSource",
"getStackTrace",
"getStackTraceDefault",
]
# Imported for side effects only
from . import _stacktrace
# Type alias
import typing
json = typing.Union[None, bool, int, float, str, typing.List, typing.Dict]

View File

@@ -0,0 +1,27 @@
from traceback import extract_stack, format_list
from ._wpiutil import getStackTraceDefault, _setup_stack_trace_hook
from os.path import join
_start_py = join("wpilib", "_impl", "start.py")
def _stack_trace_hook(offset: int) -> str:
# note: this implementation ignores offset because it's not
# actually meaningful when crossing the python/C++ boundary
stack = extract_stack()[:-1]
if not stack:
return "\tat <no python frames>\n" + getStackTraceDefault(offset)
# filter out any frames before start.py (except for one of them) to
# make stack frames more useful for users
for i in range(len(stack) - 1, 0, -1):
if stack[i].filename.endswith(_start_py):
stack = stack[i:]
break
trace = format_list(stack)
return "\n".join(trace)
_setup_stack_trace_hook(_stack_trace_hook)

View File

View File

@@ -0,0 +1,41 @@
#include <semiwrap_init.wpiutil._wpiutil.hpp>
void setup_stack_trace_hook(py::object fn);
void cleanup_stack_trace_hook();
void setup_safethread_gil();
void cleanup_safethread_gil();
#ifndef __FRC_SYSTEMCORE__
namespace wpi::impl {
void ResetSendableRegistry();
} // namespace wpi::impl
void cleanup_sendable_registry() {
py::gil_scoped_release unlock;
wpi::impl::ResetSendableRegistry();
}
#else
void cleanup_sendable_registry() {}
#endif
SEMIWRAP_PYBIND11_MODULE(m) {
initWrapper(m);
static int unused;
py::capsule cleanup(&unused, [](void *) {
cleanup_sendable_registry();
cleanup_stack_trace_hook();
cleanup_safethread_gil();
});
setup_safethread_gil();
m.def("_setup_stack_trace_hook", &setup_stack_trace_hook);
m.add_object("_st_cleanup", cleanup);
}

View File

@@ -0,0 +1,65 @@
#include <atomic>
#include <gilsafe_object.h>
#include <semiwrap.h>
using OnThreadStartFn = void *(*)();
using OnThreadEndFn = void (*)(void *);
namespace wpi::impl {
void SetSafeThreadNotifiers(OnThreadStartFn OnStart, OnThreadEndFn OnEnd);
}
struct SafeThreadState {
py::gil_scoped_acquire *acquire = nullptr;
py::gil_scoped_release *release = nullptr;
};
std::atomic<bool> g_gilstate_managed = false;
void *on_safe_thread_start() {
if (Py_IsFinalizing() // python is shutting down
|| !g_gilstate_managed.load() // python has shutdown)
) {
return nullptr;
}
auto *st = new SafeThreadState;
// acquires the GIL and creates pybind11's thread state for this thread
st->acquire = new py::gil_scoped_acquire;
// releases the GIL so the thread can start without it
st->release = new py::gil_scoped_release;
return st;
}
void on_safe_thread_end(void *opaque) {
// on entry, GIL should not be acquired
// don't cleanup if it's unsafe to do so. Several possibilities here:
if (!opaque // internal error?
|| Py_IsFinalizing() // python is shutting down
|| !g_gilstate_managed.load() // python has shutdown
) {
return;
}
auto *st = (SafeThreadState *)opaque;
delete st->release; // causes GIL to be acquired
delete st->acquire; // causes GIL to be released and thread state deleted
delete st;
}
void setup_safethread_gil() {
g_gilstate_managed = true;
// atexit handlers get called before the interpreter finalizes -- so
// we disable on_safe_thread_end before finalizing starts
auto atexit = py::module_::import("atexit");
atexit.attr("register")(
py::cpp_function([]() { g_gilstate_managed = false; }));
wpi::impl::SetSafeThreadNotifiers(on_safe_thread_start, on_safe_thread_end);
}
void cleanup_safethread_gil() { g_gilstate_managed = false; }

View File

@@ -0,0 +1,45 @@
#include <semiwrap.h>
#include <wpi/StackTrace.h>
py::object &get_hook_ref() {
static py::object hook;
return hook;
}
std::string final_py_stack_trace_hook(int offset) {
std::string msg = "\tat <python stack trace not available due to interpreter shutdown>\n";
msg += wpi::GetStackTraceDefault(offset);
return msg;
}
std::string py_stack_trace_hook(int offset) {
py::gil_scoped_acquire gil;
try {
auto &hook = get_hook_ref();
if (hook) {
return py::cast<std::string>(hook(offset));
}
} catch (py::error_already_set &e) {
e.discard_as_unraisable("wpiutil._stacktrace._stack_trace_hook");
}
return wpi::GetStackTraceDefault(offset);
}
void setup_stack_trace_hook(py::object fn) {
get_hook_ref() = fn;
wpi::SetGetStackTraceImpl(py_stack_trace_hook);
}
void cleanup_stack_trace_hook() {
wpi::SetGetStackTraceImpl(final_py_stack_trace_hook);
// release the function during interpreter shutdown
auto &hook = get_hook_ref();
if (hook) {
hook.dec_ref();
hook.release();
}
}

View File

@@ -0,0 +1,97 @@
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <wpi/array.h>
namespace pybind11 {
namespace detail {
template <size_t N>
struct wpi_array_name_maker {
template <typename T>
static constexpr auto make(const T &t) {
return concat(t, wpi_array_name_maker<N-1>::make(t));
}
};
template <>
struct wpi_array_name_maker<1> {
template <typename T>
static constexpr auto make(const T &t) {
return t;
}
};
template <typename Type, size_t Size>
struct type_caster<wpi::array<Type, Size>> {
using value_conv = make_caster<Type>;
// Have to copy/paste PYBIND11_TYPE_CASTER implementation because wpi::array
// is not default constructable
//
// begin PYBIND11_TYPE_CASTER
protected:
wpi::array<Type, Size> value{wpi::empty_array_t{}};
// An empty tuple is pretty useless
static_assert(Size > 0, "empty array not supported");
public:
static constexpr auto name = const_name("Tuple[") + wpi_array_name_maker<Size>::make(value_conv::name) + const_name("]");
template <
typename T_,
enable_if_t<std::is_same<wpi::array<Type, Size>, remove_cv_t<T_>>::value,
int> = 0>
static handle cast(T_ *src, return_value_policy policy, handle parent) {
if (!src)
return none().release();
if (policy == return_value_policy::take_ownership) {
auto h = cast(std::move(*src), policy, parent);
delete src;
return h;
} else {
return cast(*src, policy, parent);
}
}
operator wpi::array<Type, Size> *() { return &value; }
operator wpi::array<Type, Size> &() { return value; }
operator wpi::array<Type, Size> &&() && { return std::move(value); }
template <typename T_>
using cast_op_type = pybind11::detail::movable_cast_op_type<T_>;
// end PYBIND11_TYPE_CASTER
bool load(handle src, bool convert) {
if (!isinstance<sequence>(src))
return false;
auto l = reinterpret_borrow<sequence>(src);
if (l.size() != Size)
return false;
size_t ctr = 0;
for (auto it : l) {
value_conv conv;
if (!conv.load(it, convert))
return false;
value[ctr++] = cast_op<Type &&>(std::move(conv));
}
return true;
}
template <typename T>
static handle cast(T &&src, return_value_policy policy, handle parent) {
tuple l(src.size());
size_t index = 0;
for (auto &&value : src) {
auto value_ = reinterpret_steal<object>(
value_conv::cast(forward_like<T>(value), policy, parent));
if (!value_)
return handle();
PyTuple_SET_ITEM(l.ptr(), (ssize_t)index++,
value_.release().ptr()); // steals a reference
}
return l.release();
}
};
} // namespace detail
} // namespace pybind11

View File

@@ -0,0 +1,61 @@
#pragma once
#include <pybind11/pybind11.h>
#include <wpi/ct_string.h>
namespace pybind11 {
namespace detail {
template <typename CharT, typename Traits, size_t N>
struct type_caster<wpi::ct_string<CharT, Traits, N>> {
using str_type = wpi::ct_string<CharT, Traits, N>;
PYBIND11_TYPE_CASTER(str_type, const_name(PYBIND11_STRING_NAME));
// TODO
bool load(handle src, bool convert) {
return false;
}
static handle cast(const str_type& src,
py::return_value_policy policy,
py::handle parent) {
const char *buffer = reinterpret_cast<const char *>(src.data());
auto nbytes = ssize_t(src.size() * sizeof(CharT));
handle s = decode_utfN(buffer, nbytes);
if (!s) {
throw error_already_set();
}
return s;
}
// copied from py::string_caster
static constexpr size_t UTF_N = 8 * sizeof(CharT);
static handle decode_utfN(const char *buffer, ssize_t nbytes) {
#if !defined(PYPY_VERSION)
return UTF_N == 8 ? PyUnicode_DecodeUTF8(buffer, nbytes, nullptr)
: UTF_N == 16 ? PyUnicode_DecodeUTF16(buffer, nbytes, nullptr, nullptr)
: PyUnicode_DecodeUTF32(buffer, nbytes, nullptr, nullptr);
#else
// PyPy segfaults when on PyUnicode_DecodeUTF16 (and possibly on PyUnicode_DecodeUTF32 as
// well), so bypass the whole thing by just passing the encoding as a string value, which
// works properly:
return PyUnicode_Decode(buffer,
nbytes,
UTF_N == 8 ? "utf-8"
: UTF_N == 16 ? "utf-16"
: "utf-32",
nullptr);
#endif
}
};
// template <typename Char, typename Traits, size_t N>
// struct type_caster<wpi::ct_string<Char, Traits, N>>
// : string_caster<wpi::ct_string<Char, Traits, N>, false> {};
} // namespace detail
} // namespace pybind11

View File

@@ -0,0 +1,259 @@
/***************************************************************************
* Copyright (c) 2019, Martin Renou *
* *
Copyright (c) 2019,
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
****************************************************************************/
#pragma once
#include <string>
#include <vector>
#include "wpi/json.h"
#include "pybind11/pybind11.h"
namespace py = pybind11;
namespace pyjson
{
using number_unsigned_t = uint64_t;
using number_integer_t = int64_t;
inline py::object from_json(const wpi::json& j)
{
if (j.is_null())
{
return py::none();
}
else if (j.is_boolean())
{
return py::bool_(j.get<bool>());
}
else if (j.is_number_unsigned())
{
return py::int_(j.get<number_unsigned_t>());
}
else if (j.is_number_integer())
{
return py::int_(j.get<number_integer_t>());
}
else if (j.is_number_float())
{
return py::float_(j.get<double>());
}
else if (j.is_string())
{
return py::str(j.get<std::string>());
}
else if (j.is_array())
{
py::list obj(j.size());
for (std::size_t i = 0; i < j.size(); i++)
{
obj[i] = from_json(j[i]);
}
return std::move(obj);
}
else // Object
{
py::dict obj;
for (wpi::json::const_iterator it = j.cbegin(); it != j.cend(); ++it)
{
obj[py::str(it.key())] = from_json(it.value());
}
return std::move(obj);
}
}
inline wpi::json to_json(const py::handle& obj)
{
if (obj.ptr() == nullptr || obj.is_none())
{
return nullptr;
}
if (py::isinstance<py::bool_>(obj))
{
return obj.cast<bool>();
}
if (py::isinstance<py::int_>(obj))
{
try
{
number_integer_t s = obj.cast<number_integer_t>();
if (py::int_(s).equal(obj))
{
return s;
}
}
catch (...)
{
}
try
{
number_unsigned_t u = obj.cast<number_unsigned_t>();
if (py::int_(u).equal(obj))
{
return u;
}
}
catch (...)
{
}
throw py::value_error("to_json received an integer out of range for both number_integer_t and number_unsigned_t type: " + py::repr(obj).cast<std::string>());
}
if (py::isinstance<py::float_>(obj))
{
return obj.cast<double>();
}
// if (py::isinstance<py::bytes>(obj))
// {
// py::module base64 = py::module::import("base64");
// return base64.attr("b64encode")(obj).attr("decode")("utf-8").cast<std::string>();
// }
if (py::isinstance<py::str>(obj))
{
return obj.cast<std::string>();
}
if (py::isinstance<py::tuple>(obj) || py::isinstance<py::list>(obj))
{
auto out = wpi::json::array();
for (const py::handle value : obj)
{
out.push_back(to_json(value));
}
return out;
}
if (py::isinstance<py::dict>(obj))
{
auto out = wpi::json::object();
for (const py::handle key : obj)
{
if (py::isinstance<py::str>(key)) {
out[key.cast<std::string>()] = to_json(obj[key]);
} else if (py::isinstance<py::int_>(key) || py::isinstance<py::float_>(key) ||
py::isinstance<py::bool_>(key) || py::isinstance<py::none>(key)) {
// only allow the same implicit conversions python allows
out[py::str(key).cast<std::string>()] = to_json(obj[key]);
} else {
throw py::type_error("JSON keys must be str, int, float, bool, or None, not " + py::repr(key).cast<std::string>());
}
}
return out;
}
throw py::type_error("Object of type " + py::type::of(obj).attr("__name__").cast<std::string>() + " is not JSON serializable");
}
}
// nlohmann_json serializers
namespace wpi
{
#define MAKE_NLJSON_SERIALIZER_DESERIALIZER(T) \
template <> \
struct adl_serializer<T> \
{ \
inline static void to_json(json& j, const T& obj) \
{ \
j = pyjson::to_json(obj); \
} \
\
inline static T from_json(const json& j) \
{ \
return pyjson::from_json(j); \
} \
}
#define MAKE_NLJSON_SERIALIZER_ONLY(T) \
template <> \
struct adl_serializer<T> \
{ \
inline static void to_json(json& j, const T& obj) \
{ \
j = pyjson::to_json(obj); \
} \
}
MAKE_NLJSON_SERIALIZER_DESERIALIZER(py::object);
MAKE_NLJSON_SERIALIZER_DESERIALIZER(py::bool_);
MAKE_NLJSON_SERIALIZER_DESERIALIZER(py::int_);
MAKE_NLJSON_SERIALIZER_DESERIALIZER(py::float_);
MAKE_NLJSON_SERIALIZER_DESERIALIZER(py::str);
MAKE_NLJSON_SERIALIZER_DESERIALIZER(py::list);
MAKE_NLJSON_SERIALIZER_DESERIALIZER(py::tuple);
MAKE_NLJSON_SERIALIZER_DESERIALIZER(py::dict);
MAKE_NLJSON_SERIALIZER_ONLY(py::handle);
MAKE_NLJSON_SERIALIZER_ONLY(py::detail::item_accessor);
MAKE_NLJSON_SERIALIZER_ONLY(py::detail::list_accessor);
MAKE_NLJSON_SERIALIZER_ONLY(py::detail::tuple_accessor);
MAKE_NLJSON_SERIALIZER_ONLY(py::detail::sequence_accessor);
MAKE_NLJSON_SERIALIZER_ONLY(py::detail::str_attr_accessor);
MAKE_NLJSON_SERIALIZER_ONLY(py::detail::obj_attr_accessor);
#undef MAKE_NLJSON_SERIALIZER
#undef MAKE_NLJSON_SERIALIZER_ONLY
}
// pybind11 caster
namespace pybind11
{
namespace detail
{
template <> struct type_caster<wpi::json>
{
public:
PYBIND11_TYPE_CASTER(wpi::json, _("wpiutil.json"));
bool load(handle src, bool convert)
{
// TODO: raising errors gives the user informative error messages,
// but at the expense of proper argument parsing..
// try
// {
value = pyjson::to_json(src);
return true;
// }
// catch (...)
// {
// return false;
// }
}
static handle cast(wpi::json src, return_value_policy /* policy */, handle /* parent */)
{
object obj = pyjson::from_json(src);
return obj.release();
}
};
}
}

View File

@@ -0,0 +1,18 @@
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <wpi/SmallSet.h>
namespace pybind11
{
namespace detail
{
template <typename Type, unsigned Size> struct type_caster<wpi::SmallSet<Type, Size>>
: set_caster<wpi::SmallSet<Type, Size>, Type> { };
} // namespace detail
} // namespace pybind11

View File

@@ -0,0 +1,18 @@
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <wpi/SmallVector.h>
namespace pybind11
{
namespace detail
{
template <typename Type, unsigned Size> struct type_caster<wpi::SmallVector<Type, Size>>
: list_caster<wpi::SmallVector<Type, Size>, Type> { };
} // namespace detail
} // namespace pybind11

View File

@@ -0,0 +1,71 @@
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <wpi/SmallVector.h>
namespace pybind11
{
namespace detail
{
template <typename Type> struct type_caster<wpi::SmallVectorImpl<Type>> {
using value_conv = make_caster<Type>;
// Have to copy/paste PYBIND11_TYPE_CASTER implementation because SmallVectorImpl
// is not default constructable
//
// begin PYBIND11_TYPE_CASTER
protected:
wpi::SmallVector<Type, 16> value;
public:
static constexpr auto name = _("List[") + value_conv::name + _("]");
template <typename T_, enable_if_t<std::is_same<wpi::SmallVectorImpl<Type>, remove_cv_t<T_>>::value, int> = 0>
static handle cast(T_ *src, return_value_policy policy, handle parent) {
if (!src) return none().release();
if (policy == return_value_policy::take_ownership) {
auto h = cast(std::move(*src), policy, parent); delete src; return h;
} else {
return cast(*src, policy, parent);
}
}
operator wpi::SmallVectorImpl<Type>*() { return &value; }
operator wpi::SmallVectorImpl<Type>&() { return value; }
operator wpi::SmallVectorImpl<Type>&&() && { return std::move(value); }
template <typename T_> using cast_op_type = pybind11::detail::movable_cast_op_type<T_>;
// end PYBIND11_TYPE_CASTER
bool load(handle src, bool convert) {
if (!isinstance<sequence>(src) || isinstance<str>(src))
return false;
auto s = reinterpret_borrow<sequence>(src);
value.reserve(s.size());
for (auto it : s) {
value_conv conv;
if (!conv.load(it, convert))
return false;
value.push_back(cast_op<Type &&>(std::move(conv)));
}
return true;
}
template <typename T>
static handle cast(T &&src, return_value_policy policy, handle parent) {
if (!std::is_lvalue_reference<T>::value)
policy = return_value_policy_override<Type>::policy(policy);
list l(src.size());
size_t index = 0;
for (auto &&value : src) {
auto value_ = reinterpret_steal<object>(value_conv::cast(forward_like<T>(value), policy, parent));
if (!value_)
return handle();
PyList_SET_ITEM(l.ptr(), (ssize_t) index++, value_.release().ptr()); // steals a reference
}
return l.release();
}
};
} // namespace detail
} // namespace pybind11

View File

@@ -0,0 +1,172 @@
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <wpi/SmallVector.h>
#include <span>
namespace pybind11 {
namespace detail {
template <size_t N>
struct span_name_maker {
template <typename T>
static constexpr auto make(const T &t) {
return concat(t, span_name_maker<N-1>::make(t));
}
};
template <>
struct span_name_maker<1> {
template <typename T>
static constexpr auto make(const T &t) {
return t;
}
};
// span with fixed size converts to a tuple
template <typename Type, size_t Extent> struct type_caster<std::span<Type, Extent>> {
using span_type = typename std::span<Type, Extent>;
using value_conv = make_caster<Type>;
using value_type = typename std::remove_cv<Type>::type;
value_type backing_array[Extent] = {};
PYBIND11_TYPE_CASTER(span_type, _("Tuple[") + span_name_maker<Extent>::make(value_conv::name) + _("]"));
type_caster() : value(backing_array) {}
bool load(handle src, bool convert) {
if (!isinstance<sequence>(src) || isinstance<str>(src))
return false;
auto s = reinterpret_borrow<sequence>(src);
if (s.size() != Extent)
return false;
size_t i = 0;
for (auto it : s) {
value_conv conv;
if (!conv.load(it, convert))
return false;
backing_array[i] = cast_op<Type &&>(std::move(conv));
i++;
}
return true;
}
public:
template <typename T>
static handle cast(T &&src, return_value_policy policy, handle parent) {
if (!std::is_lvalue_reference<T>::value)
policy = return_value_policy_override<Type>::policy(policy);
tuple l(Extent);
size_t index = 0;
for (auto &&value : src) {
auto value_ = reinterpret_steal<object>(
value_conv::cast(forward_like<T>(value), policy, parent));
if (!value_)
return handle();
PyTuple_SET_ITEM(l.ptr(), (ssize_t)index++,
value_.release().ptr()); // steals a reference
}
return l.release();
}
};
// span with dynamic extent
template <typename Type> struct type_caster<std::span<Type, std::dynamic_extent>> {
using span_type = typename std::span<Type, std::dynamic_extent>;
using value_conv = make_caster<Type>;
using value_type = typename std::remove_cv<Type>::type;
PYBIND11_TYPE_CASTER(span_type, _("List[") + value_conv::name + _("]"));
wpi::SmallVector<value_type, 32> vec;
bool load(handle src, bool convert) {
if (!isinstance<sequence>(src) || isinstance<str>(src))
return false;
auto s = reinterpret_borrow<sequence>(src);
vec.reserve(s.size());
for (auto it : s) {
value_conv conv;
if (!conv.load(it, convert))
return false;
vec.push_back(cast_op<Type &&>(std::move(conv)));
}
value = span_type(std::data(vec), std::size(vec));
return true;
}
public:
template <typename T>
static handle cast(T &&src, return_value_policy policy, handle parent) {
if (!std::is_lvalue_reference<T>::value)
policy = return_value_policy_override<Type>::policy(policy);
list l(src.size());
size_t index = 0;
for (auto &&value : src) {
auto value_ = reinterpret_steal<object>(
value_conv::cast(forward_like<T>(value), policy, parent));
if (!value_)
return handle();
PyList_SET_ITEM(l.ptr(), (ssize_t)index++,
value_.release().ptr()); // steals a reference
}
return l.release();
}
};
// span specialization: accepts any readonly buffers
template <> struct type_caster<std::span<const uint8_t, std::dynamic_extent>> {
using span_type = typename std::span<const uint8_t, std::dynamic_extent>;
PYBIND11_TYPE_CASTER(span_type, _("Buffer"));
bool load(handle src, bool convert) {
if (!isinstance<buffer>(src))
return false;
auto buf = reinterpret_borrow<buffer>(src);
auto req = buf.request();
if (req.ndim != 1) {
return false;
}
value = span_type((const uint8_t*)req.ptr, req.size*req.itemsize);
return true;
}
public:
template <typename T>
static handle cast(T &&src, return_value_policy policy, handle parent) {
return bytes((char*)src.data(), src.size()).release();
}
};
// span specialization: writeable buffer
template <> struct type_caster<std::span<uint8_t, std::dynamic_extent>> {
using span_type = typename std::span<const uint8_t, std::dynamic_extent>;
PYBIND11_TYPE_CASTER(std::span<uint8_t>, _("Buffer"));
bool load(handle src, bool convert) {
if (!isinstance<buffer>(src))
return false;
auto buf = reinterpret_borrow<buffer>(src);
auto req = buf.request(true); // buffer must be writeable
if (req.ndim != 1) {
return false;
}
value = std::span<uint8_t>((uint8_t*)req.ptr, req.size*req.itemsize);
return true;
}
public:
template <typename T>
static handle cast(T &&src, return_value_policy policy, handle parent) {
// TODO: should this be a memoryview instead?
return bytes((char*)src.data(), src.size()).release();
}
};
} // namespace detail
} // namespace pybind11

View File

@@ -0,0 +1,19 @@
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <wpi/StringMap.h>
namespace pybind11
{
namespace detail
{
template <typename Value>
struct type_caster<wpi::StringMap<Value>>
: map_caster<wpi::StringMap<Value>, std::string, Value> { };
} // namespace detail
} // namespace pybind11

View File

@@ -0,0 +1,317 @@
#pragma once
#include <functional>
#include <memory>
#include <string_view>
#include <fmt/format.h>
#include <wpi/struct/Struct.h>
#include <pybind11/functional.h>
#include <pybind11/typing.h>
#include <semiwrap.h>
static inline std::string pytypename(const py::type &t) {
return ((PyTypeObject *)t.ptr())->tp_name;
}
//
// Dynamic struct + type caster
//
// This merely holds the python object being operated on, the actual
// serialization work is done in WPyStructConverter
struct WPyStruct {
WPyStruct() = default;
WPyStruct(const WPyStruct &other) {
py::gil_scoped_acquire gil;
py = other.py;
}
WPyStruct &operator=(const WPyStruct &other) {
{
py::gil_scoped_acquire gil;
py = other.py;
}
return *this;
}
WPyStruct(WPyStruct &&) = default;
WPyStruct(const py::object &py) : py(py) {}
~WPyStruct() {
py::gil_scoped_acquire gil;
py.release().dec_ref();
}
py::object py;
};
namespace pybind11 {
namespace detail {
template <> struct type_caster<WPyStruct> {
// TODO: wpiutil.struct.T/TV?
PYBIND11_TYPE_CASTER(WPyStruct, const_name("object"));
bool load(handle src, bool convert) {
// TODO: validation?
value.py = py::reinterpret_borrow<py::object>(src);
return true;
}
static handle cast(const WPyStruct &src, py::return_value_policy policy,
py::handle parent) {
py::handle v = src.py;
v.inc_ref();
return v;
}
};
} // namespace detail
} // namespace pybind11
//
// Struct info class implementation
//
// two types of converters: static C++ converter, and dynamic python converter
struct WPyStructConverter {
virtual ~WPyStructConverter() = default;
virtual std::string_view GetTypeName() const = 0;
virtual size_t GetSize() const = 0;
virtual std::string_view GetSchema() const = 0;
virtual void Pack(std::span<uint8_t> data, const WPyStruct &value) const = 0;
virtual WPyStruct Unpack(std::span<const uint8_t> data) const = 0;
// virtual void UnpackInto(WPyStruct *pyv,
// std::span<const uint8_t> data) const = 0;
virtual void ForEachNested(
const std::function<void(std::string_view, std::string_view)> &fn)
const = 0;
};
// static C++ converter
template <typename T> struct WPyStructCppConverter : WPyStructConverter {
std::string_view GetTypeName() const override {
return wpi::Struct<T>::GetTypeName();
}
size_t GetSize() const override { return wpi::Struct<T>::GetSize(); }
std::string_view GetSchema() const override {
return wpi::Struct<T>::GetSchema();
}
void Pack(std::span<uint8_t> data, const WPyStruct &value) const override {
py::gil_scoped_acquire gil;
const T &v = value.py.cast<const T &>();
wpi::Struct<T>::Pack(data, v);
}
WPyStruct Unpack(std::span<const uint8_t> data) const override {
py::gil_scoped_acquire gil;
return WPyStruct{py::cast(wpi::UnpackStruct<T>(data))};
}
// void UnpackInto(WPyStruct *pyv,
// std::span<const uint8_t> data) const override {
// py::gil_scoped_acquire gil;
// T *v = pyv->py.cast<T *>();
// wpi::UnpackStructInto(v, data);
// }
void ForEachNested(
const std::function<void(std::string_view, std::string_view)> &fn)
const override {
if constexpr (wpi::HasNestedStruct<T>) {
wpi::Struct<T>::ForEachNested(fn);
}
}
};
template <typename T> void SetupWPyStruct(auto pycls) {
auto *sptr =
new std::shared_ptr<WPyStructConverter>(new WPyStructCppConverter<T>());
py::capsule c(sptr, "WPyStruct", [](void *ptr) {
delete (std::shared_ptr<WPyStructConverter> *)ptr;
});
pycls.def_property_readonly_static("WPIStruct",
[c](py::object pycls) { return c; });
}
// dynamic python converter
struct WPyStructPyConverter : WPyStructConverter {
WPyStructPyConverter(py::object o) {
m_typename = o.attr("typename").cast<std::string>();
m_schema = o.attr("schema").cast<std::string>();
m_size = o.attr("size").cast<size_t>();
m_pack = py::reinterpret_borrow<py::function>(o.attr("pack"));
m_packInto = py::reinterpret_borrow<py::function>(o.attr("packInto"));
m_unpack = py::reinterpret_borrow<py::function>(o.attr("unpack"));
// m_unpackInto = py::reinterpret_borrow<py::function>(o.attr("unpackInto"));
m_forEachNested =
py::reinterpret_borrow<py::function>(o.attr("forEachNested"));
}
// copy all the relevant attributes locally
std::string m_typename;
std::string m_schema;
size_t m_size;
py::function m_pack;
py::function m_packInto;
py::function m_unpack;
// py::function m_unpackInto;
py::function m_forEachNested; // might be none
std::string_view GetTypeName() const override { return m_typename; }
size_t GetSize() const override { return m_size; }
std::string_view GetSchema() const override { return m_schema; }
void Pack(std::span<uint8_t> data, const WPyStruct &value) const override {
py::gil_scoped_acquire gil;
py::bytes result = m_pack(value.py);
std::string_view rview = result;
if (rview.size() != data.size()) {
std::string msg = fmt::format(
"{} pack did not return {} bytes (returned {})",
pytypename(py::type::of(value.py)), data.size(), rview.size());
throw py::value_error(msg);
}
rview.copy((char *)data.data(), rview.size());
}
WPyStruct Unpack(std::span<const uint8_t> data) const override {
py::gil_scoped_acquire gil;
auto view =
py::memoryview::from_memory((const void *)data.data(), data.size());
return WPyStruct(m_unpack(view));
}
// void UnpackInto(WPyStruct *pyv,
// std::span<const uint8_t> data) const override {
// py::gil_scoped_acquire gil;
// auto view =
// py::memoryview::from_memory((const void *)data.data(), data.size());
// m_unpackInto(pyv->py, view);
// }
void ForEachNested(
const std::function<void(std::string_view, std::string_view)> &fn)
const override {
py::gil_scoped_acquire gil;
if (!m_forEachNested.is_none()) {
m_forEachNested(fn);
}
}
};
// passed as I... to the wpi::Struct methods
struct WPyStructInfo {
WPyStructInfo() = default;
WPyStructInfo(const py::type &t) {
if (!py::hasattr(t, "WPIStruct")) {
throw py::type_error(
fmt::format("{} is not struct serializable (does not have WPIStruct)",
pytypename(t)));
}
py::object s = t.attr("WPIStruct");
// C++ version
void *c = PyCapsule_GetPointer(s.ptr(), "WPyStruct");
if (c != NULL) {
cvt = *(std::shared_ptr<WPyStructConverter> *)c;
return;
}
PyErr_Clear();
// Python version
try {
cvt = std::make_shared<WPyStructPyConverter>(s);
} catch (py::error_already_set &e) {
std::string msg = fmt::format(
"{} is not struct serializable (invalid WPIStruct)", pytypename(t));
py::raise_from(e, PyExc_TypeError, msg.c_str());
throw py::error_already_set();
}
}
WPyStructInfo(const WPyStruct &v) : WPyStructInfo(py::type::of(v.py)) {}
const WPyStructConverter* operator->() const {
const auto *c = cvt.get();
if (c == nullptr) {
// TODO: would be nice to have a better error here, but we don't have
// a good way to know our current context
throw py::value_error("Object is closed");
}
return c;
}
private:
// holds something used to do serialization
std::shared_ptr<WPyStructConverter> cvt;
};
// Leverages the converter stored in WPyStructInfo to do the actual work
template <> struct wpi::Struct<WPyStruct, WPyStructInfo> {
static std::string_view GetTypeName(const WPyStructInfo &info) {
return info->GetTypeName();
}
static size_t GetSize(const WPyStructInfo &info) {
return info->GetSize();
}
static std::string_view GetSchema(const WPyStructInfo &info) {
return info->GetSchema();
}
static WPyStruct Unpack(std::span<const uint8_t> data,
const WPyStructInfo &info) {
return info->Unpack(data);
}
// static void UnpackInto(WPyStruct *v, std::span<const uint8_t> data,
// const WPyStructInfo &info) {
// info->UnpackInto(v, data);
// }
static void Pack(std::span<uint8_t> data, const WPyStruct &value,
const WPyStructInfo &info) {
info->Pack(data, value);
}
static void
ForEachNested(std::invocable<std::string_view, std::string_view> auto fn,
const WPyStructInfo &info) {
info->ForEachNested(fn);
}
};
static_assert(wpi::StructSerializable<WPyStruct, WPyStructInfo>);
static_assert(wpi::HasNestedStruct<WPyStruct, WPyStructInfo>);
// This breaks on readonly structs, so we disable for now
// static_assert(wpi::MutableStructSerializable<WPyStruct, WPyStructInfo>);

View File

@@ -0,0 +1,166 @@
#include "wpystruct.h"
void forEachNested(
const py::type &t,
const std::function<void(std::string_view, std::string_view)> &fn) {
WPyStructInfo info(t);
wpi::ForEachStructSchema<WPyStruct, WPyStructInfo>(fn, info);
}
py::str getTypeName(const py::type &t) {
WPyStructInfo info(t);
return wpi::GetStructTypeName<WPyStruct, WPyStructInfo>(info);
}
py::str getSchema(const py::type &t) {
WPyStructInfo info(t);
return wpi::GetStructSchema<WPyStruct, WPyStructInfo>(info);
}
size_t getSize(const py::type &t) {
WPyStructInfo info(t);
return wpi::GetStructSize<WPyStruct>(info);
}
py::bytes pack(const WPyStruct &v) {
WPyStructInfo info(v);
auto sz = wpi::GetStructSize<WPyStruct>(info);
PyObject *b = PyBytes_FromStringAndSize(NULL, sz);
if (b == NULL) {
throw py::error_already_set();
}
char *pybuf;
py::ssize_t pysz;
if (PyBytes_AsStringAndSize(b, &pybuf, &pysz) != 0) {
Py_DECREF(b);
throw py::error_already_set();
}
auto s = std::span((uint8_t *)pybuf, pysz);
wpi::PackStruct(s, v, info);
return py::reinterpret_steal<py::bytes>(b);
}
py::bytes packArray(const py::sequence &seq) {
auto len = seq.size();
if (len == 0) {
return {};
}
WPyStructInfo info(py::type::of(seq[0]));
auto sz = wpi::GetStructSize<WPyStruct>(info);
auto total = sz*len;
PyObject *b = PyBytes_FromStringAndSize(NULL, total);
if (b == NULL) {
throw py::error_already_set();
}
char *pybuf;
py::ssize_t pysz;
if (PyBytes_AsStringAndSize(b, &pybuf, &pysz) != 0) {
Py_DECREF(b);
throw py::error_already_set();
}
auto bytes_obj = py::reinterpret_steal<py::bytes>(b);
for (const auto &v: seq) {
WPyStruct wv(v);
auto s = std::span((uint8_t *)pybuf, sz);
wpi::PackStruct(s, wv, info);
pybuf += sz;
}
return bytes_obj;
}
void packInto(const WPyStruct &v, py::buffer &b) {
WPyStructInfo info(v);
py::ssize_t sz = wpi::GetStructSize<WPyStruct>(info);
auto req = b.request();
if (req.itemsize != 1) {
throw py::value_error("buffer must only contain bytes");
} else if (req.ndim != 1) {
throw py::value_error("buffer must only have a single dimension");
}
if (req.size != sz) {
throw py::value_error("buffer must be " + std::to_string(sz) + " bytes");
}
auto s = std::span((uint8_t *)req.ptr, req.size);
wpi::PackStruct(s, v, info);
}
WPyStruct unpack(const py::type &t, const py::buffer &b) {
WPyStructInfo info(t);
py::ssize_t sz = wpi::GetStructSize<WPyStruct>(info);
auto req = b.request();
if (req.itemsize != 1) {
throw py::value_error("buffer must only contain bytes");
} else if (req.ndim != 1) {
throw py::value_error("buffer must only have a single dimension");
}
if (req.size != sz) {
throw py::value_error("buffer must be " + std::to_string(sz) + " bytes");
}
auto s = std::span((const uint8_t *)req.ptr, req.size);
return wpi::UnpackStruct<WPyStruct, WPyStructInfo>(s, info);
}
py::typing::List<WPyStruct> unpackArray(const py::type &t, const py::buffer &b) {
WPyStructInfo info(t);
py::ssize_t sz = wpi::GetStructSize<WPyStruct>(info);
auto req = b.request();
if (req.itemsize != 1) {
throw py::value_error("buffer must only contain bytes");
} else if (req.ndim != 1) {
throw py::value_error("buffer must only have a single dimension");
}
if (req.size % sz != 0) {
throw py::value_error("buffer must be multiple of " + std::to_string(sz) + " bytes");
}
auto items = req.size / sz;
py::list a(items);
const uint8_t *ptr = (const uint8_t *)req.ptr;
for (py::ssize_t i = 0; i < items; i++) {
auto s = std::span(ptr, sz);
auto v = wpi::UnpackStruct<WPyStruct, WPyStructInfo>(s, info);
// steals a reference
PyList_SET_ITEM(a.ptr(), i, v.py.inc_ref().ptr());
ptr += sz;
}
return a;
}
// void unpackInto(const py::buffer &b, WPyStruct *v) {
// WPyStructInfo info(*v);
// py::ssize_t sz = wpi::GetStructSize<WPyStruct>(info);
// auto req = b.request();
// if (req.itemsize != 1) {
// throw py::value_error("buffer must only contain bytes");
// } else if (req.ndim != 1) {
// throw py::value_error("buffer must only have a single dimension");
// }
// if (req.size != sz) {
// throw py::value_error("buffer must be " + std::to_string(sz) + " bytes");
// }
// auto s = std::span((const uint8_t *)req.ptr, req.size);
// wpi::UnpackStructInto<WPyStruct, WPyStructInfo>(v, s, info);
// }

View File

@@ -0,0 +1,59 @@
#pragma once
#include "wpystruct.h"
/**
Call a function to retrieve the (type string, schema) for each nested struct
*/
void forEachNested(
const py::type &t,
const std::function<void(std::string_view, std::string_view)> &fn);
/**
Retrieve the type name for the specified type
*/
py::str getTypeName(const py::type &t);
/**
Retrieve schema for the specified type
*/
py::str getSchema(const py::type &t);
/**
Returns the serialized size in bytes
*/
size_t getSize(const py::type &t);
/**
Serialize object into byte buffer
*/
py::bytes pack(const WPyStruct &v);
/**
Serialize objects into byte buffer
*/
py::bytes packArray(const py::sequence &seq);
/**
Serialize object into byte buffer. Buffer must be exact size.
*/
void packInto(const WPyStruct &v, py::buffer &b);
/**
Convert byte buffer into object of specified type. Buffer must be exact
size.
*/
WPyStruct unpack(const py::type &t, const py::buffer &b);
/**
Convert byte buffer into list of objects of specified type. Buffer must be
exact size.
*/
py::typing::List<WPyStruct> unpackArray(const py::type &t, const py::buffer &b);
// /**
// Convert byte buffer into passed in object. Buffer must be exact
// size.
// */
// void unpackInto(const py::buffer &b, WPyStruct *v);

View File

@@ -0,0 +1,32 @@
# autogenerated by 'semiwrap create-imports wpiutil.sync wpiutil._wpiutil.sync'
from .._wpiutil.sync import (
createEvent,
createSemaphore,
createSignalObject,
destroyEvent,
destroySemaphore,
destroySignalObject,
releaseSemaphore,
resetEvent,
resetSignalObject,
setEvent,
setSignalObject,
waitForObject,
waitForObjects,
)
__all__ = [
"createEvent",
"createSemaphore",
"createSignalObject",
"destroyEvent",
"destroySemaphore",
"destroySignalObject",
"releaseSemaphore",
"resetEvent",
"resetSignalObject",
"setEvent",
"setSignalObject",
"waitForObject",
"waitForObjects",
]

View File

@@ -0,0 +1,58 @@
"""
This package contains the WPILib Struct serialization functions, and a
mechanism to implement your own custom structs using Python (see :func:`wpiutil.wpistruct.make_wpistruct`).
"""
# autogenerated by 'semiwrap create-imports wpiutil.wpistruct wpiutil._wpiutil.wpistruct'
from .._wpiutil.wpistruct import (
forEachNested,
getSchema,
getSize,
getTypeName,
pack,
packArray,
packInto,
unpack,
unpackArray,
)
__all__ = [
"forEachNested",
"getSchema",
"getSize",
"getTypeName",
"pack",
"packArray",
"packInto",
"unpack",
"unpackArray",
]
from .desc import StructDescriptor
from .dataclass import (
make_wpistruct,
int8,
uint8,
int16,
uint16,
int32,
uint32,
int64,
uint64,
double,
)
__all__ += [
"StructDescriptor",
"make_wpistruct",
"int8",
"uint8",
"int16",
"uint16",
"int32",
"uint32",
"int64",
"uint64",
"double",
]

View File

@@ -0,0 +1,229 @@
import dataclasses
import inspect
import struct
import typing
from .desc import StructDescriptor
from .._wpiutil import wpistruct
#
# Use these types to specify explicitly sized integers, but you can
# also use int/bool/float
#
# fmt: off
if typing.TYPE_CHECKING:
int8 = int
uint8 = int
int16 = int
uint16 = int
int32 = int
uint32 = int
int64 = int
uint64 = int
double = float
else:
class int8(int): pass
class uint8(int): pass
class int16(int): pass
class uint16(int): pass
class int32(int): pass
class uint32(int): pass
class int64(int): pass
class uint64(int): pass
class double(float): pass
# fmt: on
def make_wpistruct(cls=None, /, *, name: typing.Optional[str] = None):
"""
This decorator allows you to easily define a custom type that can be
used with wpilib's custom serialization protocol (for use in datalog
and networktables). Just create a normal python dataclass, and apply
this decorator to the class.
For example, here's how you define a dataclass that contains an integer,
a boolean, and a double::
@wpiutil.wpistruct.make_wpistruct(name="mystruct")
@dataclasses.dataclass
class MyStruct:
x: wpiutil.wpistruct.int32
y: bool
z: wpiutil.struct.double
The types defined in the dataclass can be another WPIStruct compatible class
(either builtin or user defined); one of int, bool, or float; or you can
use one of the ``wpiutil.wpistruct.[u]int*`` values for explicitly sized
integer types.
"""
def wrap(cls):
return _process_class(cls, name)
if cls is None:
return wrap
return wrap(cls)
#
# Internals
#
_type_to_fmt = {
bool: ("?", "bool"),
int8: ("b", "int8"),
uint8: ("B", "uint8"),
int16: ("h", "int16"),
uint16: ("H", "uint16"),
int: ("i", "int32"),
int32: ("i", "int32"),
uint32: ("I", "uint32"),
int64: ("q", "int64"),
uint64: ("Q", "uint64"),
float: ("f", "float"),
double: ("d", "double"),
}
def _process_class(cls, struct_name: typing.Optional[str]):
resolved_hints = typing.get_type_hints(cls)
field_names = [field.name for field in dataclasses.fields(cls)]
resolved_field_types = {name: resolved_hints[name] for name in field_names}
name_parts = []
name_parts.append(getattr(cls, "__module__", None))
name_parts.append(getattr(cls, "__qualname__", cls.__name__))
cls_name = ".".join([n for n in name_parts if n])
if struct_name is None:
struct_name = cls.__name__
err_name = cls_name
else:
err_name = f"{struct_name} ({cls_name})"
fmts = []
schema = []
cvvals = []
vvals = []
packs = []
unpacks = []
# unpackIntos = []
forEachNested = []
ctx: typing.Dict[str, typing.Any] = {"cls": cls}
for name, ftype in resolved_field_types.items():
if ftype in _type_to_fmt:
fmt, stype = _type_to_fmt[ftype]
fmts.append(fmt)
schema.append(f"{stype} {name}")
cvvals.append(f"arg_{name}")
vvals.append(f"v.{name}")
elif hasattr(ftype, "WPIStruct"):
# nested struct
argn = f"arg_{name}"
typn = f"type_{name}"
ctx[typn] = ftype
ts = wpistruct.getTypeName(ftype)
schema.append(f"{ts} {name}")
sz = wpistruct.getSize(ftype)
fmts.append(f"{sz}s")
vvals.append(argn)
cvvals.append(argn)
packs.append(f"{argn} = wpistruct.pack(v.{name})")
unpacks.append(f"{argn} = wpistruct.unpack({typn}, {argn})")
# unpackIntos.append(f"wpistruct.unpackInto(v.{name}, {argn})")
forEachNested.append(f"wpistruct.forEachNested({typn}, fn)")
else:
supported_names = ", ".join(t.__name__ for t in _type_to_fmt.keys())
raise TypeError(
f"{cls_name}.{name} is not a wpistruct or does not have a supported type hint "
f"(supported: {supported_names})"
) from None
s = struct.Struct(f"<{''.join(fmts)}")
cvals = ", ".join(cvvals)
vals = ", ".join(vvals)
padding = "\n" + " " * 16
pack_stmts = padding.join(packs)
unpack_stmts = padding.join(unpacks)
# unpackInto_stmts = padding.join(unpackIntos)
if not forEachNested:
forEachNested_stmt = "_forEachNested = None"
else:
forEachNested_stmt = f"def _forEachNested(fn):"
forEachNested_stmt += "\n" + " " * 12
forEachNested_stmt += f"try:{padding}"
forEachNested_stmt += padding.join(forEachNested)
forEachNested_stmt += "\n" + " " * 12
forEachNested_stmt += f"except Exception as e:"
forEachNested_stmt += (
f"{padding}raise ValueError(f'{err_name}: error in forEachNested') from e"
)
ctx["_s"] = s
# Construct the serialization functions using the same hack NamedTuple uses
fnsrc = inspect.cleandoc(
f"""
from wpiutil import wpistruct
def _pack(v):
try:
{pack_stmts}
return _s.pack({vals})
except Exception as e:
raise ValueError(f"{err_name}: error packing data") from e
def _packInto(v, b):
try:
{pack_stmts}
return _s.pack_into(b, 0, {vals})
except Exception as e:
raise ValueError(f"{err_name}: error packing data") from e
def _unpack(b):
try:
{cvals} = _s.unpack(b)
{unpack_stmts}
return cls({cvals})
except Exception as e:
raise ValueError(f"{err_name}: error unpacking data") from e
#def _unpackInto(v, b):
# try:
# {vals} = _s.unpack(b)
# {{unpackInto_stmts}}
# except Exception as e:
# raise ValueError(f"{err_name}: error unpacking data") from e
{forEachNested_stmt}
"""
)
exec(fnsrc, ctx, ctx)
cls.WPIStruct = StructDescriptor(
typename=struct_name,
schema="; ".join(schema),
size=s.size,
pack=ctx["_pack"],
packInto=ctx["_packInto"],
unpack=ctx["_unpack"],
# unpackInto=ctx["_unpackInto"],
forEachNested=ctx["_forEachNested"],
)
return cls

View File

@@ -0,0 +1,48 @@
import typing
if typing.TYPE_CHECKING:
from typing_extensions import Buffer
else:
# Avoiding typing_extensions runtime dependency
Buffer = bytearray
class StructDescriptor(typing.NamedTuple):
"""
To define a type in python that can use the wpilib serialization, the type must
have an attribute `WPIStruct` that contains this class (but C++ classes
do not have this).
It is not intended that you should create this class directly, something
else should generate it for you.
See :func:`wpiutil.wpistruct.make_wpistruct` for a easy to use mechanism
for defining custom structs using a dataclass.
"""
#: The type name
typename: str
#: The struct schema
schema: str
#: Size in bytes of binary representation of this struct
size: int
#: A function that converts the type to bytes
pack: typing.Callable[[typing.Any], bytes]
#: A function that converts the type to bytes
packInto: typing.Callable[[typing.Any, Buffer], None]
#: A function that converts bytes to an instance
unpack: typing.Callable[[Buffer], typing.Any]
#: A function that updates the given instance using the deserialized bytes
#: .. not supported
# unpackInto: typing.Callable[[typing.Any, Buffer], None]
#: If this contains nested structs, calls wpiutil.wpistruct.forEachNested for each
forEachNested: typing.Optional[
typing.Callable[[typing.Callable[[str, str], None]], None]
]

View File

@@ -0,0 +1,23 @@
from typing import ClassVar, Protocol
try:
from typing import TypeGuard
except ImportError:
try:
from typing_extensions import TypeGuard
except ImportError:
# Runtime fallback for Python 3.9 without typing_extensions
class TypeGuard:
def __class_getitem__(cls, key):
return bool
class StructSerializable(Protocol):
"""Any type that can be serialized or deserialized as a WPILib Struct."""
WPIStruct: ClassVar
def is_wpistruct_type(cls: type) -> TypeGuard[type[StructSerializable]]:
"""Returns True if the given type supports WPILib Struct serialization."""
return hasattr(cls, "WPIStruct")

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,11 @@
[build-system]
build-backend = "hatchling.build"
requires = [
"hatch-meson", "hatchling"
]
[project]
name = "wpiutil_test"
version = "0.0.1"
[tool.hatch.build.hooks.meson]

View File

@@ -0,0 +1 @@
import wpiutil

View File

@@ -0,0 +1,195 @@
// clang-format off
#include <pybind11/pybind11.h>
#include <wpi_array_type_caster.h>
#include <wpi_span_type_caster.h>
#include <wpi_smallset_type_caster.h>
#include <wpi_smallvector_type_caster.h>
#include <wpi_smallvectorimpl_type_caster.h>
#include <wpi_string_map_caster.h>
#include <wpi_json_type_caster.h>
#include <wpi_ct_string_type_caster.h>
#include <limits>
#include <functional>
#include <pybind11/functional.h>
/*
array tests
*/
wpi::array<int, 4> load_array_int(wpi::array<int, 4> data) {
return data;
}
wpi::array<int, 1> load_array_int1(wpi::array<int, 1> data) {
return data;
}
/*
span Tests
*/
std::span<const int> load_span_int(std::span<const int> ref) {
return ref;
}
std::span<const bool> load_span_bool(std::span<const bool> ref) {
return ref;
}
std::span<std::string> load_span_string(std::span<std::string> ref) {
return ref;
}
std::span<const std::string> load_span_string_const(std::span<const std::string> ref) {
return ref;
}
std::span<std::string_view> load_span_string_view(std::span<std::string_view> ref) {
return ref;
}
std::span<std::vector<std::string>> load_span_vector(std::span<std::vector<std::string>> ref) {
return ref;
}
std::span<const double, 3> load_span_fixed_double(std::span<const double, 3> ref) {
return ref;
}
std::span<int> cast_span() {
static std::vector<int> vec{1, 2, 3};
return vec;
}
std::span<const std::string> make_string_span() {
static std::vector<std::string> vec{"hi", "there"};
return vec;
}
py::object cast_string_span() {
return py::cast(make_string_span());
}
std::span<const uint8_t> load_span_bytes(std::span<const uint8_t> ref) {
return ref;
}
void modify_span_buffer(std::span<uint8_t> ref) {
ref[0] = 0x4;
}
/*
SmallSet tests
*/
wpi::SmallSet<int, 4> load_smallset_int(wpi::SmallSet<int, 4> ref) {
return ref;
}
wpi::SmallSet<int, 4> cast_smallset() {
static wpi::SmallSet<int, 4> set;
set.insert(1);
set.insert(2);
set.insert(3);
set.insert(4);
return set;
}
/*
SmallVector tests
*/
wpi::SmallVector<int, 4> load_smallvec_int(wpi::SmallVector<int, 4> ref) {
return ref;
}
wpi::SmallVector<int, 4> cast_smallvec() {
static wpi::SmallVector<int, 4> set;
set.append({1, 2, 3, 4});
return set;
}
/*
SmallVectorImpl tests
.. seems like references are the only useful things to do with them
*/
wpi::SmallVectorImpl<int>& load_smallvecimpl_int(wpi::SmallVectorImpl<int>& ref) {
static wpi::SmallVector<int, 4> set(ref.begin(), ref.end());
return set;
}
/*
StringMap tests
*/
wpi::StringMap<int> load_stringmap_int(wpi::StringMap<int> ref) {
return ref;
}
wpi::StringMap<int> cast_stringmap() {
static wpi::StringMap<int> m;
m["one"] = 1;
m["two"] = 2;
return m;
}
/* JSON tests */
wpi::json cast_json_arg(const wpi::json &j) {
return j;
}
wpi::json cast_json_val(std::function<wpi::json()> fn) {
return fn();
}
constexpr auto const_string() {
return wpi::ct_string<char, std::char_traits<char>, 3>{{'#', '1', '2'}};
}
void sendable_test(py::module &m);
void struct_test(py::module &m);
PYBIND11_MODULE(module, m) {
sendable_test(m);
struct_test(m);
// array
m.def("load_array_int", &load_array_int);
m.def("load_array_int1", &load_array_int1);
// span
m.def("load_span_int", &load_span_int);
m.def("load_span_bool", &load_span_bool);
m.def("load_span_fixed_double", &load_span_fixed_double);
m.def("load_span_string", &load_span_string);
m.def("load_span_string_const", &load_span_string_const);
m.def("load_span_string_view", &load_span_string_view);
m.def("load_span_vector", &load_span_vector);
m.def("cast_span", &cast_span);
m.def("cast_string_span", &cast_string_span);
m.def("load_span_bytes", &load_span_bytes);
m.def("modify_span_buffer", &modify_span_buffer);
// SmallSet
m.def("load_smallset_int", &load_smallset_int);
m.def("cast_smallset", &cast_smallset);
// SmallVector
m.def("load_smallvec_int", &load_smallvec_int);
m.def("cast_smallvec", &cast_smallvec);
// SmallVectorImpl
m.def("load_smallvecimpl_int", &load_smallvecimpl_int);
// StringMap
m.def("load_stringmap_int", &load_stringmap_int);
m.def("cast_stringmap", &cast_stringmap);
// JSON
m.def("cast_json_arg", &cast_json_arg);
m.def("cast_json_val", &cast_json_val);
m.attr("max_uint64") = std::numeric_limits<uint64_t>::max();
m.attr("max_int64") = std::numeric_limits<int64_t>::max();
m.attr("min_int64") = std::numeric_limits<int64_t>::min();
// ct_string
m.def("const_string", &const_string);
};

View File

@@ -0,0 +1,154 @@
#include <pybind11/functional.h>
#include <pybind11/stl.h>
#include <semiwrap.h>
#include <wpi/sendable/SendableBuilder.h>
#include <wpi/sendable/SendableRegistry.h>
class MySendableBuilder : public wpi::SendableBuilder {
public:
MySendableBuilder(py::dict keys) : keys(keys) {}
~MySendableBuilder() {
// leak this so the python interpreter doesn't crash on shutdown
keys.release();
}
void SetSmartDashboardType(std::string_view type) override {}
void SetActuator(bool value) override {}
void AddBooleanProperty(std::string_view key, std::function<bool()> getter,
std::function<void(bool)> setter) override {}
void PublishConstBoolean(std::string_view key, bool value) override {}
void AddIntegerProperty(std::string_view key, std::function<int64_t()> getter,
std::function<void(int64_t)> setter) override {}
void PublishConstInteger(std::string_view key, int64_t value) override {}
void AddFloatProperty(std::string_view key, std::function<float()> getter,
std::function<void(float)> setter) override {}
void PublishConstFloat(std::string_view key, float value) override {}
void AddDoubleProperty(std::string_view key, std::function<double()> getter,
std::function<void(double)> setter) override {
py::gil_scoped_acquire gil;
py::object pykey = py::cast(key);
keys[pykey] = std::make_tuple(getter, setter);
}
void PublishConstDouble(std::string_view key, double value) override {}
void
AddStringProperty(std::string_view key, std::function<std::string()> getter,
std::function<void(std::string_view)> setter) override {}
void PublishConstString(std::string_view key,
std::string_view value) override {}
void AddBooleanArrayProperty(
std::string_view key, std::function<std::vector<int>()> getter,
std::function<void(std::span<const int>)> setter) override {}
void PublishConstBooleanArray(std::string_view key,
std::span<const int> value) override {}
void AddIntegerArrayProperty(
std::string_view key, std::function<std::vector<int64_t>()> getter,
std::function<void(std::span<const int64_t>)> setter) override {}
void PublishConstIntegerArray(std::string_view key,
std::span<const int64_t> value) override {}
void AddFloatArrayProperty(
std::string_view key, std::function<std::vector<float>()> getter,
std::function<void(std::span<const float>)> setter) override {}
void PublishConstFloatArray(std::string_view key,
std::span<const float> value) override {}
void AddDoubleArrayProperty(
std::string_view key, std::function<std::vector<double>()> getter,
std::function<void(std::span<const double>)> setter) override {}
void PublishConstDoubleArray(std::string_view key,
std::span<const double> value) override {}
void AddStringArrayProperty(
std::string_view key, std::function<std::vector<std::string>()> getter,
std::function<void(std::span<const std::string>)> setter) override {}
void PublishConstStringArray(std::string_view key,
std::span<const std::string> value) override {}
void AddRawProperty(
std::string_view key, std::string_view typeString,
std::function<std::vector<uint8_t>()> getter,
std::function<void(std::span<const uint8_t>)> setter) override {}
void PublishConstRaw(std::string_view key, std::string_view typeString,
std::span<const uint8_t> value) override {}
void AddSmallStringProperty(
std::string_view key,
std::function<std::string_view(wpi::SmallVectorImpl<char> &buf)> getter,
std::function<void(std::string_view)> setter) override {}
void AddSmallBooleanArrayProperty(
std::string_view key,
std::function<std::span<const int>(wpi::SmallVectorImpl<int> &buf)>
getter,
std::function<void(std::span<const int>)> setter) override {}
void AddSmallIntegerArrayProperty(
std::string_view key,
std::function<
std::span<const int64_t>(wpi::SmallVectorImpl<int64_t> &buf)>
getter,
std::function<void(std::span<const int64_t>)> setter) override {}
void AddSmallFloatArrayProperty(
std::string_view key,
std::function<std::span<const float>(wpi::SmallVectorImpl<float> &buf)>
getter,
std::function<void(std::span<const float>)> setter) override {}
void AddSmallDoubleArrayProperty(
std::string_view key,
std::function<std::span<const double>(wpi::SmallVectorImpl<double> &buf)>
getter,
std::function<void(std::span<const double>)> setter) override {}
void AddSmallStringArrayProperty(
std::string_view key,
std::function<
std::span<const std::string>(wpi::SmallVectorImpl<std::string> &buf)>
getter,
std::function<void(std::span<const std::string>)> setter) override {}
void AddSmallRawProperty(
std::string_view key, std::string_view typeString,
std::function<std::span<uint8_t>(wpi::SmallVectorImpl<uint8_t> &buf)>
getter,
std::function<void(std::span<const uint8_t>)> setter) override {}
wpi::SendableBuilder::BackendKind GetBackendKind() const override {
return wpi::SendableBuilder::BackendKind::kUnknown;
}
bool IsPublished() const override { return false; }
void Update() override {}
void ClearProperties() override {}
py::dict keys;
};
void Publish(wpi::SendableRegistry::UID sendableUid, py::dict keys) {
auto builder = std::make_unique<MySendableBuilder>(keys);
wpi::SendableRegistry::Publish(sendableUid, std::move(builder));
}
void sendable_test(py::module &m) { m.def("publish", Publish); }

View File

@@ -0,0 +1,84 @@
#include <pybind11/operators.h>
#include <wpystruct.h>
//
// Thing to serialize
//
struct ThingA {
ThingA() = default;
ThingA(int x) : x(x) {}
const int x = 0;
bool operator==(const ThingA &other) const { return x == other.x; }
};
template <> struct wpi::Struct<ThingA> {
static constexpr std::string_view GetTypeName() { return "ThingA"; }
static constexpr size_t GetSize() { return 1; }
static constexpr std::string_view GetSchema() { return "uint8 value"; }
static ThingA Unpack(std::span<const uint8_t> data) {
return ThingA{data[0]};
}
static void Pack(std::span<uint8_t> data, const ThingA &value) {
data[0] = value.x;
}
};
struct Outer {
Outer() = default;
Outer(const ThingA &t, int c) : inner(t), c(c) {}
ThingA inner;
int c = 0;
bool operator==(const Outer &other) const {
return inner == other.inner && c == other.c;
}
};
template <>
struct wpi::Struct<Outer> {
static constexpr std::string_view GetTypeName() { return "Outer"; }
static constexpr size_t GetSize() { return wpi::GetStructSize<ThingA>() + 4; }
static constexpr std::string_view GetSchema() {
return "ThingA inner; int32 c";
}
static Outer Unpack(std::span<const uint8_t> data) {
constexpr size_t innerSize = wpi::GetStructSize<ThingA>();
return {wpi::UnpackStruct<ThingA, 0>(data),
wpi::UnpackStruct<int32_t, innerSize>(data)};
}
static void Pack(std::span<uint8_t> data, const Outer& value) {
constexpr size_t innerSize = wpi::GetStructSize<ThingA>();
wpi::PackStruct<0>(data, value.inner);
wpi::PackStruct<innerSize>(data, value.c);
}
static void ForEachNested(
std::invocable<std::string_view, std::string_view> auto fn) {
wpi::ForEachStructSchema<ThingA>(fn);
}
};
void struct_test(py::module &m) {
py::class_<ThingA> thingCls(m, "ThingA");
thingCls.def(py::init<>());
thingCls.def(py::init<int>());
thingCls.def_readonly("x", &ThingA::x);
thingCls.def(py::self == py::self);
SetupWPyStruct<ThingA>(thingCls);
py::class_<Outer> outerCls(m, "Outer");
outerCls.def(py::init<>());
outerCls.def(py::init<ThingA, int>());
outerCls.def_readonly("inner", &Outer::inner);
outerCls.def_readwrite("c", &Outer::c);
outerCls.def(py::self == py::self);
SetupWPyStruct<Outer>(outerCls);
}

View File

@@ -0,0 +1,26 @@
#!/usr/bin/env python3
import os
from os.path import abspath, dirname
import sys
import subprocess
if __name__ == "__main__":
root = abspath(dirname(__file__))
os.chdir(root)
subprocess.check_call(
[
sys.executable,
"-m",
"pip",
"--disable-pip-version-check",
"install",
"-v",
"--force-reinstall",
"--no-build-isolation",
"./cpp",
],
)
subprocess.check_call([sys.executable, "-m", "pytest"])

View File

@@ -0,0 +1,17 @@
from wpiutil_test import module
def test_load_array_int():
assert module.load_array_int((1, 2, 3, 4)) == (1, 2, 3, 4)
assert module.load_array_int([1, 2, 3, 4]) == (1, 2, 3, 4)
def test_load_array_annotation():
assert (
module.load_array_int.__doc__
== "load_array_int(arg0: Tuple[typing.SupportsInt, typing.SupportsInt, typing.SupportsInt, typing.SupportsInt]) -> Tuple[int, int, int, int]\n"
)
assert (
module.load_array_int1.__doc__
== "load_array_int1(arg0: Tuple[typing.SupportsInt]) -> Tuple[int]\n"
)

View File

@@ -0,0 +1,5 @@
from wpiutil_test import module
def test_const_string():
assert module.const_string() == "#12"

View File

@@ -0,0 +1,71 @@
from wpiutil_test.module import (
cast_json_arg,
cast_json_val,
max_int64,
min_int64,
max_uint64,
)
import pytest
import math
def test_json_invalid():
with pytest.raises(TypeError):
cast_json_val(lambda: object())
def test_json_none():
assert cast_json_arg(None) == None
def test_json_bool():
assert cast_json_arg(True) == True
assert cast_json_arg(False) == False
def test_json_int():
assert cast_json_arg(36) == 36
assert cast_json_arg(min_int64) == min_int64
with pytest.raises(ValueError):
cast_json_arg(min_int64 - 1)
assert cast_json_arg(max_int64) == max_int64
assert cast_json_arg(max_uint64) == max_uint64
with pytest.raises(ValueError):
cast_json_arg(max_uint64 + 1)
def test_json_float():
assert cast_json_arg(36.37) == 36.37
assert cast_json_arg(math.inf) == math.inf
assert math.isnan(cast_json_arg(math.nan))
def test_json_string():
assert cast_json_arg("hi") == "hi"
def test_json_list():
v = [36, "hello", False]
assert cast_json_arg(v) == v
assert cast_json_arg([]) == []
tv = (36, "hello", False)
assert cast_json_arg(tv) == v
def test_json_dict():
d = {"number": 1234, "hello": "world"}
assert cast_json_arg(d) == d
assert cast_json_arg({}) == {}
assert cast_json_arg({1: 2}) == {"1": 2}
assert cast_json_arg({None: 2}) == {"None": 2}
assert cast_json_arg({1.2: 2}) == {"1.2": 2}
assert cast_json_arg({False: 2}) == {"False": 2}
with pytest.raises(TypeError):
cast_json_arg({object(): 2})

View File

@@ -0,0 +1,35 @@
import typing
import wpiutil
from wpiutil_test import module
class MySendable(wpiutil.Sendable):
def __init__(self):
super().__init__()
wpiutil.SendableRegistry.add(self, "Test", 1)
self.value = 0
def initSendable(self, builder: wpiutil.SendableBuilder):
builder.addDoubleProperty("key", self._get, self._set)
def _set(self, value: float):
self.value = value
def _get(self) -> float:
return self.value
def test_custom_sendable():
ms = MySendable()
uid = wpiutil.SendableRegistry.getUniqueId(ms)
keys = {}
module.publish(uid, keys)
assert ms.value == 0
getter, setter = keys["key"]
assert getter() == 0
setter(1)
assert getter() == 1
assert ms.value == 1

View File

@@ -0,0 +1,18 @@
from wpiutil_test import module
def test_smallset_load():
assert module.load_smallset_int({1, 2, 3, 4}) == {1, 2, 3, 4}
def test_smallsetbool_load():
assert module.load_smallset_int({True, True, False, True}) == {
True,
True,
False,
True,
}
def test_smallset_cast():
assert module.cast_smallset() == {1, 2, 3, 4}

View File

@@ -0,0 +1,22 @@
from wpiutil_test import module
def test_smallvec_load():
assert module.load_smallvec_int([1, 2, 3, 4]) == [1, 2, 3, 4]
def test_smallvecbool_load():
assert module.load_smallvec_int([True, True, False, True]) == [
True,
True,
False,
True,
]
def test_smallvec_cast():
assert module.cast_smallvec() == [1, 2, 3, 4]
def test_smallvecimpl_load():
assert module.load_smallvecimpl_int([1, 2, 3, 4]) == [1, 2, 3, 4]

View File

@@ -0,0 +1,75 @@
import pytest
from wpiutil_test import module
import array
def test_span_load_int():
assert module.load_span_int([1, 2, 3, 4]) == [1, 2, 3, 4]
def test_span_load_int():
assert module.load_span_int([1, 2, 3]) == [1, 2, 3]
def test_span_load_bool():
assert module.load_span_bool([True, False, True]) == [True, False, True]
def test_span_load_string():
assert module.load_span_string(["a", "b", "c"]) == ["a", "b", "c"]
def test_span_load_string_const():
assert module.load_span_string_const(["a", "b", "c"]) == ["a", "b", "c"]
def test_span_load_stringview():
assert module.load_span_string_view(["a", "b", "c"]) == ["a", "b", "c"]
def test_span_load_vector():
assert module.load_span_vector([["a"], ["b"], ["c"]]) == [["a"], ["b"], ["c"]]
def test_span_load_buffer_bytes():
assert module.load_span_bytes(b"abc") == b"abc"
def test_span_modify_buffer_bytes():
b = b"abc"
with pytest.raises(BufferError):
module.modify_span_buffer(b)
def test_span_load_buffer_bytearray():
assert module.load_span_bytes(bytearray([1, 2, 3])) == b"\x01\x02\x03"
def test_span_modify_buffer_bytearray():
b = bytearray([1, 2, 3])
module.modify_span_buffer(b)
assert b == bytearray([4, 2, 3])
def test_span_load_buffer_array():
a = array.array("l")
a.append(1)
a2 = array.array("l")
a2.frombytes(module.load_span_bytes(a))
assert len(a2) == 1
assert a2[0] == 1
def test_span_cast():
assert module.cast_span() == [1, 2, 3]
def test_string_span():
assert module.cast_string_span() == ["hi", "there"]
def test_fixed_double_span():
assert module.load_span_fixed_double([1, 2, 3]) == (1, 2, 3)
with pytest.raises(TypeError):
assert module.load_span_fixed_double([1, 2, 3, 4])

View File

@@ -0,0 +1,9 @@
import wpiutil
def test_python_stack_trace():
st = wpiutil._wpiutil.getStackTrace(0)
assert __file__ in st
st = wpiutil._wpiutil.getStackTraceDefault(0)
assert __file__ not in st

View File

@@ -0,0 +1,13 @@
from wpiutil_test import module
def test_stringmap_load():
assert module.load_stringmap_int({"one": 11, "two": 22, "three": 33}) == {
"one": 11,
"two": 22,
"three": 33,
}
def test_stringmap_cast():
assert module.cast_stringmap() == {"one": 1, "two": 2}

View File

@@ -0,0 +1,242 @@
import dataclasses
import re
import pytest
from wpiutil import wpistruct
from wpiutil_test import module
#
# Static serialization
#
# ensure that a type that doesn't work has a sane error message
def test_invalid_type():
with pytest.raises(
TypeError,
match=re.escape("str is not struct serializable (does not have WPIStruct)"),
):
wpistruct.getSchema(str)
def test_for_each_nested():
l = []
def _fn(*args):
l.append(args)
wpistruct.forEachNested(module.ThingA, _fn)
assert l == [("struct:ThingA", "uint8 value")]
def test_get_type_string():
assert wpistruct.getTypeName(module.ThingA) == "ThingA"
def test_get_schema():
assert wpistruct.getSchema(module.ThingA) == "uint8 value"
def test_get_size():
assert wpistruct.getSize(module.ThingA) == 1
def test_pack():
assert wpistruct.pack(module.ThingA(1)) == b"\x01"
def test_pack_array():
assert wpistruct.packArray([module.ThingA(1), module.ThingA(2)]) == b"\x01\x02"
def test_pack_into():
buf = bytearray(1)
wpistruct.packInto(module.ThingA(1), buf)
assert buf == b"\x01"
def test_pack_into_err():
buf = bytearray(2)
with pytest.raises(ValueError, match=re.escape("buffer must be 1 bytes")):
wpistruct.packInto(module.ThingA(1), buf)
def test_unpack():
assert wpistruct.unpack(module.ThingA, b"\x01") == module.ThingA(1)
def test_unpack_array():
assert wpistruct.unpackArray(module.ThingA, b"\x01\x02") == [
module.ThingA(1),
module.ThingA(2),
]
# def test_unpack_into():
# r1 = module.ThingA(1)
# r2 = module.ThingA(2)
# assert r1 != r2
# wpistruct.unpackInto(b"\x01", r2)
# assert r1 == r2
#
# Nested struct
#
def test_nested_for_each_nested():
l = []
def _fn(*args):
l.append(args)
wpistruct.forEachNested(module.Outer, _fn)
assert l == [
("struct:ThingA", "uint8 value"),
("struct:Outer", "ThingA inner; int32 c"),
]
def test_nested_get_type_string():
assert wpistruct.getTypeName(module.ThingA) == "ThingA"
def test_nested_get_schema():
assert wpistruct.getSchema(module.Outer) == "ThingA inner; int32 c"
def test_nested_get_size():
assert wpistruct.getSize(module.Outer) == 5
def test_nested_pack():
v = module.Outer(module.ThingA(2), 4)
assert wpistruct.pack(v) == b"\x02\x04\x00\x00\x00"
def test_nested_pack_into():
v = module.Outer(module.ThingA(3), 5)
buf = bytearray(5)
wpistruct.packInto(v, buf)
assert buf == b"\x03\x05\x00\x00\x00"
def test_nested_unpack():
assert wpistruct.unpack(module.ThingA, b"\x01") == module.ThingA(1)
#
# User defined serialization
#
@wpistruct.make_wpistruct(name="mystruct")
@dataclasses.dataclass
class MyStruct:
x: int
y: bool
z: float
def test_user_for_each_nested():
l = []
def _fn(*args):
l.append(args)
wpistruct.forEachNested(MyStruct, _fn)
assert l == [("struct:mystruct", "int32 x; bool y; float z")]
def test_user_get_type_string():
assert wpistruct.getTypeName(MyStruct) == "mystruct"
def test_user_get_schema():
assert wpistruct.getSchema(MyStruct) == "int32 x; bool y; float z"
def test_user_get_size():
assert wpistruct.getSize(MyStruct) == 9
def test_user_pack():
v = MyStruct(2, True, 3.5)
assert wpistruct.pack(v) == b"\x02\x00\x00\x00\x01\x00\x00\x60\x40"
def test_user_pack_into():
v = MyStruct(2, True, 3.5)
buf = bytearray(9)
wpistruct.packInto(v, buf)
assert buf == b"\x02\x00\x00\x00\x01\x00\x00\x60\x40"
def test_user_unpack():
v = MyStruct(2, True, 3.5)
assert wpistruct.unpack(MyStruct, b"\x02\x00\x00\x00\x01\x00\x00\x60\x40") == v
# def test_user_unpack_into():
# v1 = MyStruct(2, True, 3.5)
# v2 = MyStruct(3, True, 4.5)
# assert v1 != v2
# wpistruct.unpackInto(b"\x02\x00\x00\x00\x01\x00\x00\x60\x40", v2)
# assert v1 == v2
#
# User defined serialization (nested)
#
@wpistruct.make_wpistruct
@dataclasses.dataclass
class Outer:
x: int
inner: MyStruct
def test_user_nested_for_each_nested():
l = []
def _fn(*args):
l.append(args)
wpistruct.forEachNested(Outer, _fn)
assert l == [
("struct:mystruct", "int32 x; bool y; float z"),
("struct:Outer", "int32 x; mystruct inner"),
]
def test_user_nested_get_type_string():
assert wpistruct.getTypeName(Outer) == "Outer"
def test_user_nested_get_schema():
assert wpistruct.getSchema(Outer) == "int32 x; mystruct inner"
def test_user_nested_get_size():
assert wpistruct.getSize(Outer) == 4 + 9
def test_user_nested_pack():
v = Outer(2, MyStruct(3, True, 4.0))
assert wpistruct.pack(v) == b"\x02\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x80\x40"
def test_user_nested_pack_into():
v = Outer(2, MyStruct(3, True, 4.0))
buf = bytearray(4 + 9)
wpistruct.packInto(v, buf)
assert buf == b"\x02\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x80\x40"
def test_user_nested_unpack():
assert wpistruct.unpack(
Outer, b"\x02\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x80\x40"
) == Outer(2, MyStruct(3, True, 4.0))