You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
595 lines
19 KiB
595 lines
19 KiB
# Copyright 2022 The ChromiumOS Authors
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Provides patch utilities for PATCHES.json file handling."""
|
|
|
|
import collections
|
|
import contextlib
|
|
import dataclasses
|
|
import json
|
|
from pathlib import Path
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from typing import Any, Dict, IO, Iterable, List, Optional, Tuple, Union
|
|
|
|
|
|
CHECKED_FILE_RE = re.compile(r"^checking file\s+(.*)$")
|
|
HUNK_FAILED_RE = re.compile(r"^Hunk #(\d+) FAILED at.*")
|
|
HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+),(\d+)\s+\+(\d+),(\d+)\s+@@")
|
|
HUNK_END_RE = re.compile(r"^--\s*$")
|
|
PATCH_SUBFILE_HEADER_RE = re.compile(r"^\+\+\+ [ab]/(.*)$")
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def atomic_write(fp: Union[Path, str], mode="w", *args, **kwargs):
|
|
"""Write to a filepath atomically.
|
|
|
|
This works by a temp file swap, created with a .tmp suffix in
|
|
the same directory briefly until being renamed to the desired
|
|
filepath.
|
|
|
|
Args:
|
|
fp: Filepath to open.
|
|
mode: File mode; can be 'w', 'wb'. Default 'w'.
|
|
*args: Passed to Path.open as nargs.
|
|
**kwargs: Passed to Path.open as kwargs.
|
|
|
|
Raises:
|
|
ValueError when the mode is invalid.
|
|
"""
|
|
if isinstance(fp, str):
|
|
fp = Path(fp)
|
|
if mode not in ("w", "wb"):
|
|
raise ValueError(f"mode {mode} not accepted")
|
|
temp_fp = fp.with_suffix(fp.suffix + ".tmp")
|
|
try:
|
|
with temp_fp.open(mode, *args, **kwargs) as f:
|
|
yield f
|
|
except:
|
|
if temp_fp.is_file():
|
|
temp_fp.unlink()
|
|
raise
|
|
temp_fp.rename(fp)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Hunk:
|
|
"""Represents a patch Hunk."""
|
|
|
|
hunk_id: int
|
|
"""Hunk ID for the current file."""
|
|
orig_start: int
|
|
orig_hunk_len: int
|
|
patch_start: int
|
|
patch_hunk_len: int
|
|
patch_hunk_lineno_begin: int
|
|
patch_hunk_lineno_end: Optional[int]
|
|
|
|
|
|
def parse_patch_stream(patch_stream: IO[str]) -> Dict[str, List[Hunk]]:
|
|
"""Parse a patch file-like into Hunks.
|
|
|
|
Args:
|
|
patch_stream: A IO stream formatted like a git patch file.
|
|
|
|
Returns:
|
|
A dictionary mapping filenames to lists of Hunks present
|
|
in the patch stream.
|
|
"""
|
|
|
|
current_filepath = None
|
|
current_hunk_id = 0
|
|
current_hunk = None
|
|
out = collections.defaultdict(list)
|
|
for lineno, line in enumerate(patch_stream.readlines()):
|
|
subfile_header = PATCH_SUBFILE_HEADER_RE.match(line)
|
|
if subfile_header:
|
|
current_filepath = subfile_header.group(1)
|
|
if not current_filepath:
|
|
raise RuntimeError("Could not get file header in patch stream")
|
|
# Need to reset the hunk id, as it's per-file.
|
|
current_hunk_id = 0
|
|
continue
|
|
hunk_header = HUNK_HEADER_RE.match(line)
|
|
if hunk_header:
|
|
if not current_filepath:
|
|
raise RuntimeError(
|
|
"Parsed hunk before file header in patch stream"
|
|
)
|
|
if current_hunk:
|
|
# Already parsing a hunk
|
|
current_hunk.patch_hunk_lineno_end = lineno
|
|
current_hunk_id += 1
|
|
current_hunk = Hunk(
|
|
hunk_id=current_hunk_id,
|
|
orig_start=int(hunk_header.group(1)),
|
|
orig_hunk_len=int(hunk_header.group(2)),
|
|
patch_start=int(hunk_header.group(3)),
|
|
patch_hunk_len=int(hunk_header.group(4)),
|
|
patch_hunk_lineno_begin=lineno + 1,
|
|
patch_hunk_lineno_end=None,
|
|
)
|
|
out[current_filepath].append(current_hunk)
|
|
continue
|
|
if current_hunk and HUNK_END_RE.match(line):
|
|
current_hunk.patch_hunk_lineno_end = lineno
|
|
return out
|
|
|
|
|
|
def parse_failed_patch_output(text: str) -> Dict[str, List[int]]:
|
|
current_file = None
|
|
failed_hunks = collections.defaultdict(list)
|
|
for eline in text.split("\n"):
|
|
checked_file_match = CHECKED_FILE_RE.match(eline)
|
|
if checked_file_match:
|
|
current_file = checked_file_match.group(1)
|
|
continue
|
|
failed_match = HUNK_FAILED_RE.match(eline)
|
|
if failed_match:
|
|
if not current_file:
|
|
raise ValueError("Input stream was not parsable")
|
|
hunk_id = int(failed_match.group(1))
|
|
failed_hunks[current_file].append(hunk_id)
|
|
return failed_hunks
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class PatchResult:
|
|
"""Result of a patch application."""
|
|
|
|
succeeded: bool
|
|
failed_hunks: Dict[str, List[Hunk]] = dataclasses.field(
|
|
default_factory=dict
|
|
)
|
|
|
|
def __bool__(self):
|
|
return self.succeeded
|
|
|
|
def failure_info(self) -> str:
|
|
if self.succeeded:
|
|
return ""
|
|
s = ""
|
|
for file, hunks in self.failed_hunks.items():
|
|
s += f"{file}:\n"
|
|
for h in hunks:
|
|
s += f"Lines {h.orig_start} to {h.orig_start + h.orig_hunk_len}\n"
|
|
s += "--------------------\n"
|
|
return s
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PatchEntry:
|
|
"""Object mapping of an entry of PATCHES.json."""
|
|
|
|
workdir: Path
|
|
"""Storage location for the patches."""
|
|
metadata: Optional[Dict[str, Any]]
|
|
platforms: Optional[List[str]]
|
|
rel_patch_path: str
|
|
version_range: Optional[Dict[str, Optional[int]]]
|
|
_parsed_hunks = None
|
|
|
|
def __post_init__(self):
|
|
if not self.workdir.is_dir():
|
|
raise ValueError(f"workdir {self.workdir} is not a directory")
|
|
|
|
@classmethod
|
|
def from_dict(cls, workdir: Path, data: Dict[str, Any]):
|
|
"""Instatiate from a dictionary.
|
|
|
|
Dictionary must have at least the following key:
|
|
|
|
{
|
|
'rel_patch_path': '<relative patch path to workdir>',
|
|
}
|
|
|
|
Returns:
|
|
A new PatchEntry.
|
|
"""
|
|
return cls(
|
|
workdir,
|
|
data.get("metadata"),
|
|
data.get("platforms"),
|
|
data["rel_patch_path"],
|
|
data.get("version_range"),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
out: Dict[str, Any] = {
|
|
"metadata": self.metadata,
|
|
}
|
|
if self.platforms:
|
|
# To match patch_sync, only serialized when
|
|
# non-empty and non-null.
|
|
out["platforms"] = sorted(self.platforms)
|
|
out.update(
|
|
{
|
|
"rel_patch_path": self.rel_patch_path,
|
|
"version_range": self.version_range,
|
|
}
|
|
)
|
|
return out
|
|
|
|
def parsed_hunks(self) -> Dict[str, List[Hunk]]:
|
|
# Minor caching here because IO is slow.
|
|
if not self._parsed_hunks:
|
|
with self.patch_path().open(encoding="utf-8") as f:
|
|
self._parsed_hunks = parse_patch_stream(f)
|
|
return self._parsed_hunks
|
|
|
|
def patch_path(self) -> Path:
|
|
return self.workdir / self.rel_patch_path
|
|
|
|
def can_patch_version(self, svn_version: int) -> bool:
|
|
"""Is this patch meant to apply to `svn_version`?"""
|
|
# Sometimes the key is there, but it's set to None.
|
|
if not self.version_range:
|
|
return True
|
|
from_v = self.version_range.get("from") or 0
|
|
until_v = self.version_range.get("until")
|
|
if until_v is None:
|
|
until_v = sys.maxsize
|
|
return from_v <= svn_version < until_v
|
|
|
|
def is_old(self, svn_version: int) -> bool:
|
|
"""Is this patch old compared to `svn_version`?"""
|
|
if not self.version_range:
|
|
return False
|
|
until_v = self.version_range.get("until")
|
|
# Sometimes the key is there, but it's set to None.
|
|
if until_v is None:
|
|
until_v = sys.maxsize
|
|
return svn_version >= until_v
|
|
|
|
def apply(
|
|
self, root_dir: Path, extra_args: Optional[List[str]] = None
|
|
) -> PatchResult:
|
|
"""Apply a patch to a given directory."""
|
|
if not extra_args:
|
|
extra_args = []
|
|
# Cmd to apply a patch in the src unpack path.
|
|
abs_patch_path = self.patch_path().absolute()
|
|
if not abs_patch_path.is_file():
|
|
raise RuntimeError(
|
|
f"Cannot apply: patch {abs_patch_path} is not a file"
|
|
)
|
|
cmd = [
|
|
"patch",
|
|
"-d",
|
|
root_dir.absolute(),
|
|
"-f",
|
|
"-p1",
|
|
"--no-backup-if-mismatch",
|
|
"-i",
|
|
abs_patch_path,
|
|
] + extra_args
|
|
try:
|
|
subprocess.run(
|
|
cmd, encoding="utf-8", check=True, stdout=subprocess.PIPE
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
parsed_hunks = self.parsed_hunks()
|
|
failed_hunks_id_dict = parse_failed_patch_output(e.stdout)
|
|
failed_hunks = {}
|
|
for path, failed_hunk_ids in failed_hunks_id_dict.items():
|
|
hunks_for_file = parsed_hunks[path]
|
|
failed_hunks[path] = [
|
|
hunk
|
|
for hunk in hunks_for_file
|
|
if hunk.hunk_id in failed_hunk_ids
|
|
]
|
|
return PatchResult(succeeded=False, failed_hunks=failed_hunks)
|
|
return PatchResult(succeeded=True)
|
|
|
|
def test_apply(self, root_dir: Path) -> PatchResult:
|
|
"""Dry run applying a patch to a given directory."""
|
|
return self.apply(root_dir, ["--dry-run"])
|
|
|
|
def title(self) -> str:
|
|
if not self.metadata:
|
|
return ""
|
|
return self.metadata.get("title", "")
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class PatchInfo:
|
|
"""Holds info for a round of patch applications."""
|
|
|
|
# str types are legacy. Patch lists should
|
|
# probably be PatchEntries,
|
|
applied_patches: List[PatchEntry]
|
|
failed_patches: List[PatchEntry]
|
|
# Can be deleted once legacy code is removed.
|
|
non_applicable_patches: List[str]
|
|
# Can be deleted once legacy code is removed.
|
|
disabled_patches: List[str]
|
|
# Can be deleted once legacy code is removed.
|
|
removed_patches: List[str]
|
|
# Can be deleted once legacy code is removed.
|
|
modified_metadata: Optional[str]
|
|
|
|
def _asdict(self):
|
|
return dataclasses.asdict(self)
|
|
|
|
|
|
def json_to_patch_entries(workdir: Path, json_fd: IO[str]) -> List[PatchEntry]:
|
|
"""Convert a json IO object to List[PatchEntry].
|
|
|
|
Examples:
|
|
>>> f = open('PATCHES.json')
|
|
>>> patch_entries = json_to_patch_entries(Path(), f)
|
|
"""
|
|
return [PatchEntry.from_dict(workdir, d) for d in json.load(json_fd)]
|
|
|
|
|
|
def _print_failed_patch(pe: PatchEntry, failed_hunks: Dict[str, List[Hunk]]):
|
|
"""Print information about a single failing PatchEntry.
|
|
|
|
Args:
|
|
pe: A PatchEntry that failed.
|
|
failed_hunks: Hunks for pe which failed as dict:
|
|
filepath: [Hunk...]
|
|
"""
|
|
print(f"Could not apply {pe.rel_patch_path}: {pe.title()}", file=sys.stderr)
|
|
for fp, hunks in failed_hunks.items():
|
|
print(f"{fp}:", file=sys.stderr)
|
|
for h in hunks:
|
|
print(
|
|
f"- {pe.rel_patch_path} "
|
|
f"l:{h.patch_hunk_lineno_begin}...{h.patch_hunk_lineno_end}",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
|
|
def apply_all_from_json(
|
|
svn_version: int,
|
|
llvm_src_dir: Path,
|
|
patches_json_fp: Path,
|
|
continue_on_failure: bool = False,
|
|
) -> PatchInfo:
|
|
"""Attempt to apply some patches to a given LLVM source tree.
|
|
|
|
This relies on a PATCHES.json file to be the primary way
|
|
the patches are applied.
|
|
|
|
Args:
|
|
svn_version: LLVM Subversion revision to patch.
|
|
llvm_src_dir: llvm-project root-level source directory to patch.
|
|
patches_json_fp: Filepath to the PATCHES.json file.
|
|
continue_on_failure: Skip any patches which failed to apply,
|
|
rather than throw an Exception.
|
|
"""
|
|
with patches_json_fp.open(encoding="utf-8") as f:
|
|
patches = json_to_patch_entries(patches_json_fp.parent, f)
|
|
skipped_patches = []
|
|
failed_patches = []
|
|
applied_patches = []
|
|
for pe in patches:
|
|
applied, failed_hunks = apply_single_patch_entry(
|
|
svn_version, llvm_src_dir, pe
|
|
)
|
|
if applied:
|
|
applied_patches.append(pe)
|
|
continue
|
|
if failed_hunks is not None:
|
|
if continue_on_failure:
|
|
failed_patches.append(pe)
|
|
continue
|
|
else:
|
|
_print_failed_patch(pe, failed_hunks)
|
|
raise RuntimeError(
|
|
"failed to apply patch " f"{pe.patch_path()}: {pe.title()}"
|
|
)
|
|
# Didn't apply, didn't fail, it was skipped.
|
|
skipped_patches.append(pe)
|
|
return PatchInfo(
|
|
non_applicable_patches=skipped_patches,
|
|
applied_patches=applied_patches,
|
|
failed_patches=failed_patches,
|
|
disabled_patches=[],
|
|
removed_patches=[],
|
|
modified_metadata=None,
|
|
)
|
|
|
|
|
|
def apply_single_patch_entry(
|
|
svn_version: int,
|
|
llvm_src_dir: Path,
|
|
pe: PatchEntry,
|
|
ignore_version_range: bool = False,
|
|
) -> Tuple[bool, Optional[Dict[str, List[Hunk]]]]:
|
|
"""Try to apply a single PatchEntry object.
|
|
|
|
Returns:
|
|
Tuple where the first element indicates whether the patch applied,
|
|
and the second element is a faild hunk mapping from file name to lists of
|
|
hunks (if the patch didn't apply).
|
|
"""
|
|
# Don't apply patches outside of the version range.
|
|
if not ignore_version_range and not pe.can_patch_version(svn_version):
|
|
return False, None
|
|
# Test first to avoid making changes.
|
|
test_application = pe.test_apply(llvm_src_dir)
|
|
if not test_application:
|
|
return False, test_application.failed_hunks
|
|
# Now actually make changes.
|
|
application_result = pe.apply(llvm_src_dir)
|
|
if not application_result:
|
|
# This should be very rare/impossible.
|
|
return False, application_result.failed_hunks
|
|
return True, None
|
|
|
|
|
|
def is_git_dirty(git_root_dir: Path) -> bool:
|
|
"""Return whether the given git directory has uncommitted changes."""
|
|
if not git_root_dir.is_dir():
|
|
raise ValueError(f"git_root_dir {git_root_dir} is not a directory")
|
|
cmd = ["git", "ls-files", "-m", "--other", "--exclude-standard"]
|
|
return (
|
|
subprocess.run(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
check=True,
|
|
cwd=git_root_dir,
|
|
encoding="utf-8",
|
|
).stdout
|
|
!= ""
|
|
)
|
|
|
|
|
|
def clean_src_tree(src_path):
|
|
"""Cleans the source tree of the changes made in 'src_path'."""
|
|
|
|
reset_src_tree_cmd = ["git", "-C", src_path, "reset", "HEAD", "--hard"]
|
|
|
|
subprocess.run(reset_src_tree_cmd, check=True)
|
|
|
|
clean_src_tree_cmd = ["git", "-C", src_path, "clean", "-fd"]
|
|
|
|
subprocess.run(clean_src_tree_cmd, check=True)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def git_clean_context(git_root_dir: Path):
|
|
"""Cleans up a git directory when the context exits."""
|
|
if is_git_dirty(git_root_dir):
|
|
raise RuntimeError("Cannot setup clean context; git_root_dir is dirty")
|
|
try:
|
|
yield
|
|
finally:
|
|
clean_src_tree(git_root_dir)
|
|
|
|
|
|
def _write_json_changes(patches: List[Dict[str, Any]], file_io: IO[str]):
|
|
"""Write JSON changes to file, does not acquire new file lock."""
|
|
json.dump(patches, file_io, indent=4, separators=(",", ": "))
|
|
# Need to add a newline as json.dump omits it.
|
|
file_io.write("\n")
|
|
|
|
|
|
def update_version_ranges(
|
|
svn_version: int, llvm_src_dir: Path, patches_json_fp: Path
|
|
) -> PatchInfo:
|
|
"""Reduce the version ranges of failing patches.
|
|
|
|
Patches which fail to apply will have their 'version_range.until'
|
|
field reduced to the passed in svn_version.
|
|
|
|
Modifies the contents of patches_json_fp.
|
|
|
|
Args:
|
|
svn_version: LLVM revision number.
|
|
llvm_src_dir: llvm-project directory path.
|
|
patches_json_fp: Filepath to the PATCHES.json file.
|
|
|
|
Returns:
|
|
PatchInfo for applied and disabled patches.
|
|
"""
|
|
with patches_json_fp.open(encoding="utf-8") as f:
|
|
patch_entries = json_to_patch_entries(
|
|
patches_json_fp.parent,
|
|
f,
|
|
)
|
|
modified_entries, applied_patches = update_version_ranges_with_entries(
|
|
svn_version, llvm_src_dir, patch_entries
|
|
)
|
|
with atomic_write(patches_json_fp, encoding="utf-8") as f:
|
|
_write_json_changes([p.to_dict() for p in patch_entries], f)
|
|
for entry in modified_entries:
|
|
print(
|
|
f"Stopped applying {entry.rel_patch_path} ({entry.title()}) "
|
|
f"for r{svn_version}"
|
|
)
|
|
return PatchInfo(
|
|
non_applicable_patches=[],
|
|
applied_patches=applied_patches,
|
|
failed_patches=[],
|
|
disabled_patches=[p.rel_patch_path for p in modified_entries],
|
|
removed_patches=[],
|
|
modified_metadata=str(patches_json_fp) if modified_entries else None,
|
|
)
|
|
|
|
|
|
def update_version_ranges_with_entries(
|
|
svn_version: int,
|
|
llvm_src_dir: Path,
|
|
patch_entries: Iterable[PatchEntry],
|
|
) -> Tuple[List[PatchEntry], List[PatchEntry]]:
|
|
"""Test-able helper for UpdateVersionRanges.
|
|
|
|
Args:
|
|
svn_version: LLVM revision number.
|
|
llvm_src_dir: llvm-project directory path.
|
|
patch_entries: PatchEntry objects to modify.
|
|
|
|
Returns:
|
|
Tuple of (modified entries, applied patches)
|
|
|
|
Post:
|
|
Modifies patch_entries in place.
|
|
"""
|
|
modified_entries: List[PatchEntry] = []
|
|
applied_patches: List[PatchEntry] = []
|
|
active_patches = (pe for pe in patch_entries if not pe.is_old(svn_version))
|
|
with git_clean_context(llvm_src_dir):
|
|
for pe in active_patches:
|
|
test_result = pe.test_apply(llvm_src_dir)
|
|
if not test_result:
|
|
if pe.version_range is None:
|
|
pe.version_range = {}
|
|
pe.version_range["until"] = svn_version
|
|
modified_entries.append(pe)
|
|
else:
|
|
# We have to actually apply the patch so that future patches
|
|
# will stack properly.
|
|
if not pe.apply(llvm_src_dir).succeeded:
|
|
raise RuntimeError(
|
|
"Could not apply patch that dry ran successfully"
|
|
)
|
|
applied_patches.append(pe)
|
|
|
|
return modified_entries, applied_patches
|
|
|
|
|
|
def remove_old_patches(
|
|
svn_version: int, llvm_src_dir: Path, patches_json_fp: Path
|
|
) -> PatchInfo:
|
|
"""Remove patches that don't and will never apply for the future.
|
|
|
|
Patches are determined to be "old" via the "is_old" method for
|
|
each patch entry.
|
|
|
|
Args:
|
|
svn_version: LLVM SVN version.
|
|
llvm_src_dir: LLVM source directory.
|
|
patches_json_fp: Location to edit patches on.
|
|
|
|
Returns:
|
|
PatchInfo for modified patches.
|
|
"""
|
|
with patches_json_fp.open(encoding="utf-8") as f:
|
|
patches_list = json.load(f)
|
|
patch_entries = (
|
|
PatchEntry.from_dict(llvm_src_dir, elem) for elem in patches_list
|
|
)
|
|
oldness = [(entry, entry.is_old(svn_version)) for entry in patch_entries]
|
|
filtered_entries = [entry.to_dict() for entry, old in oldness if not old]
|
|
with atomic_write(patches_json_fp, encoding="utf-8") as f:
|
|
_write_json_changes(filtered_entries, f)
|
|
removed_entries = [entry for entry, old in oldness if old]
|
|
plural_patches = "patch" if len(removed_entries) == 1 else "patches"
|
|
print(f"Removed {len(removed_entries)} old {plural_patches}:")
|
|
for r in removed_entries:
|
|
print(f"- {r.rel_patch_path}: {r.title()}")
|
|
|
|
return PatchInfo(
|
|
non_applicable_patches=[],
|
|
applied_patches=[],
|
|
failed_patches=[],
|
|
disabled_patches=[],
|
|
removed_patches=[p.rel_patch_path for p in removed_entries],
|
|
modified_metadata=str(patches_json_fp) if removed_entries else None,
|
|
)
|