[upstream_utils] Use pathlib instead of os.path (#7983)

A noteworthy change is the replacement of the `dp.startswith(os.path.join(".", "subdir"))` pattern. pathlib doesn't offer something with similar semantics besides `match` and `full_match`, so there's now a helper function that replicates the behavior.

Other notable changes include the addition of type annotations to ensure code correctness, using == to check file names instead of `endswith` for clarity (`endswith` is still used to check extensions), manual walking and copying being refactored in googletest, json, memory, nanopb, protobuf, and sleipnir to use `walk_cwd_and_copy_if`, and matching functions being shortened to the point where they can just be inlined into the lambda.

Co-authored-by: Tyler Veness <calcmogul@gmail.com>
Co-authored-by: David Vo <auscompgeek@users.noreply.github.com>
This commit is contained in:
Gold856
2025-05-29 22:05:22 +00:00
committed by GitHub
parent de718f7ae5
commit ca05ffa1b9
26 changed files with 625 additions and 1045 deletions

View File

@@ -5,6 +5,8 @@ import shutil
import subprocess
import sys
import tempfile
from collections.abc import Callable
from pathlib import Path
def get_repo_root():
@@ -12,15 +14,17 @@ def get_repo_root():
An empty string is returned if no repository root was found.
"""
return subprocess.run(
["git", "rev-parse", "--show-toplevel"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding="ascii",
).stdout.rstrip()
return Path(
subprocess.run(
["git", "rev-parse", "--show-toplevel"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding="ascii",
).stdout.rstrip()
)
def walk_if(top, pred):
def walk_if(top: Path, pred: Callable[[Path, str], bool]):
"""Walks the current directory, then returns a list of files for which the
given predicate is true.
@@ -29,12 +33,10 @@ def walk_if(top, pred):
pred -- a function that takes a directory path and a filename, then returns
True if the file should be included in the output list
"""
return [
os.path.join(dp, f) for dp, dn, fn in os.walk(top) for f in fn if pred(dp, f)
]
return [dp / f for dp, dn, fn in top.walk() for f in fn if pred(dp, f)]
def copy_to(files, root, rename_c_to_cpp=False):
def copy_to(files: list[Path], root: Path, rename_c_to_cpp=False):
"""Copies list of files to root by appending the relative paths of the files
to root.
@@ -48,31 +50,33 @@ def copy_to(files, root, rename_c_to_cpp=False):
Returns:
The list of files in their destination.
"""
if not os.path.exists(root):
os.makedirs(root)
if not root.exists():
root.mkdir(parents=True)
dest_files = []
dest_files: list[Path] = []
for f in files:
dest_file = os.path.join(root, f)
dest_file = root / f
# Rename .cc or .cxx file to .cpp
if dest_file.endswith(".cc") or dest_file.endswith(".cxx"):
dest_file = os.path.splitext(dest_file)[0] + ".cpp"
if dest_file.match("*.cc") or dest_file.match("*.cxx"):
dest_file = dest_file.with_suffix(".cpp")
if rename_c_to_cpp and dest_file.endswith(".c"):
dest_file = os.path.splitext(dest_file)[0] + ".cpp"
if rename_c_to_cpp and dest_file.match("*.c"):
dest_file = dest_file.with_suffix(".cpp")
# Make leading directory
dest_dir = os.path.dirname(dest_file)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
dest_dir = dest_file.parent
if not dest_dir.exists():
dest_dir.mkdir(parents=True)
shutil.copyfile(f, dest_file)
dest_files.append(dest_file)
return dest_files
def walk_cwd_and_copy_if(pred, root, rename_c_to_cpp=False):
def walk_cwd_and_copy_if(
pred: Callable[[Path, str], bool], root: Path, rename_c_to_cpp=False
):
"""Walks the current directory, generates a list of files for which the
given predicate is true, then copies that list to root by appending the
relative paths of the files to root.
@@ -88,12 +92,12 @@ def walk_cwd_and_copy_if(pred, root, rename_c_to_cpp=False):
Returns:
The list of files in their destination.
"""
files = walk_if(".", pred)
files = walk_if(Path("."), pred)
files = copy_to(files, root, rename_c_to_cpp)
return files
def comment_out_invalid_includes(filename, include_roots):
def comment_out_invalid_includes(filename: Path, include_roots: list[Path]):
"""Comment out #include directives that include a nonexistent file
Keyword arguments:
@@ -114,11 +118,8 @@ def comment_out_invalid_includes(filename, include_roots):
# Comment out #include if the file doesn't exist in current directory or
# include root
if not os.path.exists(
os.path.join(os.path.dirname(filename), include)
) and not any(
os.path.exists(os.path.join(include_root, include))
for include_root in include_roots
if not (filename.parent / include).exists() and not any(
(include_root / include).exists() for include_root in include_roots
):
new_contents += "// "
@@ -135,7 +136,22 @@ def comment_out_invalid_includes(filename, include_roots):
f.write(new_contents)
def git_am(patch, use_threeway=False, ignore_whitespace=False):
def has_prefix(path: Path, prefix: Path):
"""Checks if a path has a certain prefix.
Keyword arguments:
path -- path to check if it begins with the prefix
prefix -- prefix to use
Returns:
True if the path begins with the prefix, False otherwise.
"""
return len(path.parts) >= len(prefix.parts) and all(
p1 == p2 for p1, p2 in zip(path.parts, prefix.parts)
)
def git_am(patch: Path, use_threeway=False, ignore_whitespace=False):
"""Apply patch to a Git repository in the current directory using "git am".
Keyword arguments:
@@ -152,7 +168,7 @@ def git_am(patch, use_threeway=False, ignore_whitespace=False):
subprocess.check_output(args + [patch])
def has_git_rev(rev):
def has_git_rev(rev: str):
"""Checks whether the Git repository in the current directory has the given
revision.
@@ -169,10 +185,10 @@ def has_git_rev(rev):
class Lib:
def __init__(
self,
name,
url,
tag,
copy_upstream_src,
name: str,
url: str,
tag: str,
copy_upstream_src: Callable[[Path], None],
patch_options={},
*,
pre_patch_hook=None,
@@ -207,7 +223,7 @@ class Lib:
self.pre_patch_commits = pre_patch_commits
self.wpilib_root = get_repo_root()
def get_repo_path(self, tempdir=None):
def get_repo_path(self, tempdir: str | None = None):
"""Returns the path to the clone of the upstream repository.
Keyword argument:
@@ -221,11 +237,11 @@ class Lib:
if tempdir is None:
tempdir = tempfile.gettempdir()
repo = os.path.basename(self.url)
dest = os.path.join(tempdir, repo)
dest = dest.removesuffix(".git")
dest = Path(tempdir, repo)
dest = dest.with_suffix("")
return dest
def open_repo(self, *, err_msg_if_absent):
def open_repo(self, *, err_msg_if_absent: str | None):
"""Changes the current working directory to the upstream repository. If
err_msg_if_absent is not None and the upstream repository does not
exist, the program exits with return code 1.
@@ -241,7 +257,7 @@ class Lib:
print(f"INFO: Opening repository at {dest}")
if not os.path.exists(dest):
if not dest.exists():
if err_msg_if_absent is None:
subprocess.run(["git", "clone", "--filter=tree:0", self.url])
else:
@@ -285,7 +301,7 @@ class Lib:
exit(1)
return root_tags[0]
def set_root_tag(self, tag):
def set_root_tag(self, tag: str):
"""Sets the root tag, deleting any potential candidates first.
Keyword argument:
@@ -307,19 +323,17 @@ class Lib:
Returns:
The absolute path to the directory containing the patch files.
"""
return os.path.join(self.wpilib_root, f"upstream_utils/{self.name}_patches")
return self.wpilib_root / f"upstream_utils/{self.name}_patches"
def get_patch_list(self):
def get_patch_list(self) -> list[Path]:
"""Returns a list of the filenames of the patches to apply.
Returns:
A list of the filenames of the patches to apply, sorted in lexicographic
order by the Unicode code points."""
if not os.path.exists(self.get_patch_directory()):
if not self.get_patch_directory().exists():
return []
return sorted(
f for f in os.listdir(self.get_patch_directory()) if f.endswith(".patch")
)
return sorted(self.get_patch_directory().glob("*.patch"))
def apply_patches(self):
"""Applies the patches listed in the patch list to the current
@@ -330,17 +344,17 @@ class Lib:
for f in self.get_patch_list():
git_am(
os.path.join(self.get_patch_directory(), f),
self.get_patch_directory() / f,
**self.patch_options,
)
def replace_tag(self, tag):
def replace_tag(self, tag: str):
"""Replaces the tag in the script.
Keyword argument:
tag -- The tag to replace the script tag with.
"""
path = os.path.join(self.wpilib_root, f"upstream_utils/{self.name}.py")
path = self.wpilib_root / f"upstream_utils/{self.name}.py"
with open(path, "r") as file:
lines = file.readlines()
@@ -391,7 +405,7 @@ class Lib:
self.set_root_tag(self.old_tag)
def rebase(self, new_tag):
def rebase(self, new_tag: str):
"""Rebases the patches.
Keyword argument:
@@ -449,7 +463,7 @@ class Lib:
]
)
if os.path.exists(self.get_patch_directory()):
if self.get_patch_directory().exists():
shutil.rmtree(self.get_patch_directory())
is_first = True