aboutsummaryrefslogtreecommitdiff
path: root/pw_presubmit/py/pw_presubmit/presubmit.py
diff options
context:
space:
mode:
Diffstat (limited to 'pw_presubmit/py/pw_presubmit/presubmit.py')
-rw-r--r--pw_presubmit/py/pw_presubmit/presubmit.py1227
1 files changed, 1044 insertions, 183 deletions
diff --git a/pw_presubmit/py/pw_presubmit/presubmit.py b/pw_presubmit/py/pw_presubmit/presubmit.py
index 08585d04e..ef10a3abe 100644
--- a/pw_presubmit/py/pw_presubmit/presubmit.py
+++ b/pw_presubmit/py/pw_presubmit/presubmit.py
@@ -41,33 +41,50 @@ from __future__ import annotations
import collections
import contextlib
+import copy
import dataclasses
import enum
from inspect import Parameter, signature
import itertools
+import json
import logging
import os
from pathlib import Path
import re
+import shutil
+import signal
import subprocess
+import sys
+import tempfile as tf
import time
-from typing import (Callable, Collection, Dict, Iterable, Iterator, List,
- NamedTuple, Optional, Pattern, Sequence, Set, Tuple, Union)
-
+import types
+from typing import (
+ Any,
+ Callable,
+ Collection,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Pattern,
+ Sequence,
+ Set,
+ Tuple,
+ Union,
+)
+import urllib
+
+import pw_cli.color
import pw_cli.env
+import pw_env_setup.config_file
+from pw_package import package_manager
from pw_presubmit import git_repo, tools
from pw_presubmit.tools import plural
_LOG: logging.Logger = logging.getLogger(__name__)
-color_red = tools.make_color(31)
-color_bold_red = tools.make_color(31, 1)
-color_black_on_red = tools.make_color(30, 41)
-color_yellow = tools.make_color(33, 1)
-color_green = tools.make_color(32)
-color_black_on_green = tools.make_color(30, 42)
-color_aqua = tools.make_color(36)
-color_bold_white = tools.make_color(37, 1)
+_COLOR = pw_cli.color.colors()
_SUMMARY_BOX = '══╦╗ ║║══╩╝'
_CHECK_UPPER = '━━━┓ '
@@ -93,34 +110,46 @@ def _format_time(time_s: float) -> str:
def _box(style, left, middle, right, box=tools.make_box('><>')) -> str:
- return box.format(*style,
- section1=left + ('' if left.endswith(' ') else ' '),
- width1=_LEFT,
- section2=' ' + middle,
- width2=WIDTH - _LEFT - _RIGHT - 4,
- section3=right + ' ',
- width3=_RIGHT)
+ return box.format(
+ *style,
+ section1=left + ('' if left.endswith(' ') else ' '),
+ width1=_LEFT,
+ section2=' ' + middle,
+ width2=WIDTH - _LEFT - _RIGHT - 4,
+ section3=right + ' ',
+ width3=_RIGHT,
+ )
class PresubmitFailure(Exception):
"""Optional exception to use for presubmit failures."""
- def __init__(self, description: str = '', path=None):
- super().__init__(f'{path}: {description}' if path else description)
-
-
-class _Result(enum.Enum):
+ def __init__(
+ self,
+ description: str = '',
+ path: Optional[Path] = None,
+ line: Optional[int] = None,
+ ):
+ line_part: str = ''
+ if line is not None:
+ line_part = f'{line}:'
+ super().__init__(
+ f'{path}:{line_part} {description}' if path else description
+ )
+
+
+class PresubmitResult(enum.Enum):
PASS = 'PASSED' # Check completed successfully.
FAIL = 'FAILED' # Check failed.
CANCEL = 'CANCEL' # Check didn't complete.
def colorized(self, width: int, invert: bool = False) -> str:
- if self is _Result.PASS:
- color = color_black_on_green if invert else color_green
- elif self is _Result.FAIL:
- color = color_black_on_red if invert else color_red
- elif self is _Result.CANCEL:
- color = color_yellow
+ if self is PresubmitResult.PASS:
+ color = _COLOR.black_on_green if invert else _COLOR.green
+ elif self is PresubmitResult.FAIL:
+ color = _COLOR.black_on_red if invert else _COLOR.red
+ elif self is PresubmitResult.CANCEL:
+ color = _COLOR.yellow
else:
color = lambda value: value
@@ -130,9 +159,18 @@ class _Result(enum.Enum):
class Program(collections.abc.Sequence):
"""A sequence of presubmit checks; basically a tuple with a name."""
+
def __init__(self, name: str, steps: Iterable[Callable]):
self.name = name
- self._steps = tuple({s: None for s in tools.flatten(steps)})
+
+ def ensure_check(step):
+ if isinstance(step, Check):
+ return step
+ return Check(step)
+
+ self._steps: tuple[Check, ...] = tuple(
+ {ensure_check(s): None for s in tools.flatten(steps)}
+ )
def __getitem__(self, i):
return self._steps[i]
@@ -152,6 +190,7 @@ class Programs(collections.abc.Mapping):
Use is optional. Helpful when managing multiple presubmit check programs.
"""
+
def __init__(self, **programs: Sequence):
"""Initializes a name: program mapping from the provided keyword args.
@@ -159,12 +198,11 @@ class Programs(collections.abc.Mapping):
contain nested sequences, which are flattened.
"""
self._programs: Dict[str, Program] = {
- name: Program(name, checks)
- for name, checks in programs.items()
+ name: Program(name, checks) for name, checks in programs.items()
}
- def all_steps(self) -> Dict[str, Callable]:
- return {c.__name__: c for c in itertools.chain(*self.values())}
+ def all_steps(self) -> Dict[str, Check]:
+ return {c.name: c for c in itertools.chain(*self.values())}
def __getitem__(self, item: str) -> Program:
return self._programs[item]
@@ -177,34 +215,474 @@ class Programs(collections.abc.Mapping):
@dataclasses.dataclass(frozen=True)
-class PresubmitContext:
- """Context passed into presubmit checks."""
+class FormatOptions:
+ python_formatter: Optional[str] = 'yapf'
+ black_path: Optional[str] = 'black'
+
+ # TODO(b/264578594) Add exclude to pigweed.json file.
+ # exclude: Sequence[re.Pattern] = dataclasses.field(default_factory=list)
+
+ @staticmethod
+ def load() -> 'FormatOptions':
+ config = pw_env_setup.config_file.load()
+ fmt = config.get('pw', {}).get('pw_presubmit', {}).get('format', {})
+ return FormatOptions(
+ python_formatter=fmt.get('python_formatter', 'yapf'),
+ black_path=fmt.get('black_path', 'black'),
+ # exclude=tuple(re.compile(x) for x in fmt.get('exclude', ())),
+ )
+
+
+@dataclasses.dataclass
+class LuciPipeline:
+ round: int
+ builds_from_previous_iteration: Sequence[str]
+
+ @staticmethod
+ def create(
+ bbid: int,
+ fake_pipeline_props: Optional[Dict[str, Any]] = None,
+ ) -> Optional['LuciPipeline']:
+ pipeline_props: Dict[str, Any]
+ if fake_pipeline_props is not None:
+ pipeline_props = fake_pipeline_props
+ else:
+ pipeline_props = (
+ get_buildbucket_info(bbid)
+ .get('input', {})
+ .get('properties', {})
+ .get('$pigweed/pipeline', {})
+ )
+ if not pipeline_props.get('inside_a_pipeline', False):
+ return None
+
+ return LuciPipeline(
+ round=int(pipeline_props['round']),
+ builds_from_previous_iteration=list(
+ pipeline_props['builds_from_previous_iteration']
+ ),
+ )
+
+
+def get_buildbucket_info(bbid) -> Dict[str, Any]:
+ if not bbid or not shutil.which('bb'):
+ return {}
+
+ output = subprocess.check_output(
+ ['bb', 'get', '-json', '-p', f'{bbid}'], text=True
+ )
+ return json.loads(output)
+
+
+def download_cas_artifact(
+ ctx: PresubmitContext, digest: str, output_dir: str
+) -> None:
+ """Downloads the given digest to the given outputdirectory
+
+ Args:
+ ctx: the presubmit context
+ digest:
+ a string digest in the form "<digest hash>/<size bytes>"
+ i.e 693a04e41374150d9d4b645fccb49d6f96e10b527c7a24b1e17b331f508aa73b/86
+ output_dir: the directory we want to download the artifacts to
+ """
+ if ctx.luci is None:
+ raise PresubmitFailure('Lucicontext is None')
+ cmd = [
+ 'cas',
+ 'download',
+ '-cas-instance',
+ ctx.luci.cas_instance,
+ '-digest',
+ digest,
+ '-dir',
+ output_dir,
+ ]
+ try:
+ subprocess.check_call(cmd)
+ except subprocess.CalledProcessError as failure:
+ raise PresubmitFailure('cas download failed') from failure
+
+
+def archive_cas_artifact(
+ ctx: PresubmitContext, root: str, upload_paths: List[str]
+) -> str:
+ """Uploads the given artifacts into cas
+
+ Args:
+ ctx: the presubmit context
+ root: root directory of archived tree, should be absolutepath.
+ paths: path to archived files/dirs, should be absolute path.
+ If empty, [root] will be used.
+
+ Returns:
+ A string digest in the form "<digest hash>/<size bytes>"
+ i.e 693a04e41374150d9d4b645fccb49d6f96e10b527c7a24b1e17b331f508aa73b/86
+ """
+ if ctx.luci is None:
+ raise PresubmitFailure('Lucicontext is None')
+ assert os.path.abspath(root)
+ if not upload_paths:
+ upload_paths = [root]
+ for path in upload_paths:
+ assert os.path.abspath(path)
+
+ with tf.NamedTemporaryFile(mode='w+t') as tmp_digest_file:
+ with tf.NamedTemporaryFile(mode='w+t') as tmp_paths_file:
+ json_paths = json.dumps(
+ [
+ [str(root), str(os.path.relpath(path, root))]
+ for path in upload_paths
+ ]
+ )
+ tmp_paths_file.write(json_paths)
+ tmp_paths_file.seek(0)
+ cmd = [
+ 'cas',
+ 'archive',
+ '-cas-instance',
+ ctx.luci.cas_instance,
+ '-paths-json',
+ tmp_paths_file.name,
+ '-dump-digest',
+ tmp_digest_file.name,
+ ]
+ try:
+ subprocess.check_call(cmd)
+ except subprocess.CalledProcessError as failure:
+ raise PresubmitFailure('cas archive failed') from failure
+
+ tmp_digest_file.seek(0)
+ uploaded_digest = tmp_digest_file.read()
+ return uploaded_digest
+
+
+@dataclasses.dataclass
+class LuciTrigger:
+ """Details the pending change or submitted commit triggering the build."""
+
+ number: int
+ remote: str
+ branch: str
+ ref: str
+ gerrit_name: str
+ submitted: bool
+
+ @property
+ def gerrit_url(self):
+ if not self.number:
+ return self.gitiles_url
+ return 'https://{}-review.googlesource.com/c/{}'.format(
+ self.gerrit_name, self.number
+ )
+
+ @property
+ def gitiles_url(self):
+ return '{}/+/{}'.format(self.remote, self.ref)
+
+ @staticmethod
+ def create_from_environment(
+ env: Optional[Dict[str, str]] = None,
+ ) -> Sequence['LuciTrigger']:
+ if not env:
+ env = os.environ.copy()
+ raw_path = env.get('TRIGGERING_CHANGES_JSON')
+ if not raw_path:
+ return ()
+ path = Path(raw_path)
+ if not path.is_file():
+ return ()
+
+ result = []
+ with open(path, 'r') as ins:
+ for trigger in json.load(ins):
+ keys = {
+ 'number',
+ 'remote',
+ 'branch',
+ 'ref',
+ 'gerrit_name',
+ 'submitted',
+ }
+ if keys <= trigger.keys():
+ result.append(LuciTrigger(**{x: trigger[x] for x in keys}))
+
+ return tuple(result)
+
+ @staticmethod
+ def create_for_testing():
+ change = {
+ 'number': 123456,
+ 'remote': 'https://pigweed.googlesource.com/pigweed/pigweed',
+ 'branch': 'main',
+ 'ref': 'refs/changes/56/123456/1',
+ 'gerrit_name': 'pigweed',
+ 'submitted': True,
+ }
+ with tf.TemporaryDirectory() as tempdir:
+ changes_json = Path(tempdir) / 'changes.json'
+ with changes_json.open('w') as outs:
+ json.dump([change], outs)
+ env = {'TRIGGERING_CHANGES_JSON': changes_json}
+ return LuciTrigger.create_from_environment(env)
+
+
+@dataclasses.dataclass
+class LuciContext:
+ """LUCI-specific information about the environment."""
+
+ buildbucket_id: int
+ build_number: int
+ project: str
+ bucket: str
+ builder: str
+ swarming_server: str
+ swarming_task_id: str
+ cas_instance: str
+ pipeline: Optional[LuciPipeline]
+ triggers: Sequence[LuciTrigger] = dataclasses.field(default_factory=tuple)
+
+ @staticmethod
+ def create_from_environment(
+ env: Optional[Dict[str, str]] = None,
+ fake_pipeline_props: Optional[Dict[str, Any]] = None,
+ ) -> Optional['LuciContext']:
+ """Create a LuciContext from the environment."""
+
+ if not env:
+ env = os.environ.copy()
+
+ luci_vars = [
+ 'BUILDBUCKET_ID',
+ 'BUILDBUCKET_NAME',
+ 'BUILD_NUMBER',
+ 'SWARMING_TASK_ID',
+ 'SWARMING_SERVER',
+ ]
+ if any(x for x in luci_vars if x not in env):
+ return None
+
+ project, bucket, builder = env['BUILDBUCKET_NAME'].split(':')
+
+ bbid: int = 0
+ pipeline: Optional[LuciPipeline] = None
+ try:
+ bbid = int(env['BUILDBUCKET_ID'])
+ pipeline = LuciPipeline.create(bbid, fake_pipeline_props)
+
+ except ValueError:
+ pass
+
+ # Logic to identify cas instance from swarming server is derived from
+ # https://chromium.googlesource.com/infra/luci/recipes-py/+/main/recipe_modules/cas/api.py
+ swarm_server = env['SWARMING_SERVER']
+ cas_project = urllib.parse.urlparse(swarm_server).netloc.split('.')[0]
+ cas_instance = f'projects/{cas_project}/instances/default_instance'
+
+ result = LuciContext(
+ buildbucket_id=bbid,
+ build_number=int(env['BUILD_NUMBER']),
+ project=project,
+ bucket=bucket,
+ builder=builder,
+ swarming_server=env['SWARMING_SERVER'],
+ swarming_task_id=env['SWARMING_TASK_ID'],
+ cas_instance=cas_instance,
+ pipeline=pipeline,
+ triggers=LuciTrigger.create_from_environment(env),
+ )
+ _LOG.debug('%r', result)
+ return result
+
+ @staticmethod
+ def create_for_testing():
+ env = {
+ 'BUILDBUCKET_ID': '881234567890',
+ 'BUILDBUCKET_NAME': 'pigweed:bucket.try:builder-name',
+ 'BUILD_NUMBER': '123',
+ 'SWARMING_SERVER': 'https://chromium-swarm.appspot.com',
+ 'SWARMING_TASK_ID': 'cd2dac62d2',
+ }
+ return LuciContext.create_from_environment(env, {})
+
+
+@dataclasses.dataclass
+class FormatContext:
+ """Context passed into formatting helpers.
+
+ This class is a subset of PresubmitContext containing only what's needed by
+ formatters.
+
+ For full documentation on the members see the PresubmitContext section of
+ pw_presubmit/docs.rst.
+
+ Args:
+ root: Source checkout root directory
+ output_dir: Output directory for this specific language
+ paths: Modified files for the presubmit step to check (often used in
+ formatting steps but ignored in compile steps)
+ package_root: Root directory for pw package installations
+ format_options: Formatting options, derived from pigweed.json
+ """
+
+ root: Optional[Path]
+ output_dir: Path
+ paths: Tuple[Path, ...]
+ package_root: Path
+ format_options: FormatOptions
+
+
+@dataclasses.dataclass
+class PresubmitContext: # pylint: disable=too-many-instance-attributes
+ """Context passed into presubmit checks.
+
+ For full documentation on the members see pw_presubmit/docs.rst.
+
+ Args:
+ root: Source checkout root directory
+ repos: Repositories (top-level and submodules) processed by
+ pw presubmit
+ output_dir: Output directory for this specific presubmit step
+ failure_summary_log: Path where steps should write a brief summary of
+ any failures encountered for use by other tooling.
+ paths: Modified files for the presubmit step to check (often used in
+ formatting steps but ignored in compile steps)
+ all_paths: All files in the tree.
+ package_root: Root directory for pw package installations
+ override_gn_args: Additional GN args processed by build.gn_gen()
+ luci: Information about the LUCI build or None if not running in LUCI
+ format_options: Formatting options, derived from pigweed.json
+ num_jobs: Number of jobs to run in parallel
+ continue_after_build_error: For steps that compile, don't exit on the
+ first compilation error
+ """
+
root: Path
repos: Tuple[Path, ...]
output_dir: Path
+ failure_summary_log: Path
paths: Tuple[Path, ...]
+ all_paths: Tuple[Path, ...]
package_root: Path
+ luci: Optional[LuciContext]
+ override_gn_args: Dict[str, str]
+ format_options: FormatOptions
+ num_jobs: Optional[int] = None
+ continue_after_build_error: bool = False
+ _failed: bool = False
- def relative_paths(self, start: Optional[Path] = None) -> Tuple[Path, ...]:
- return tuple(
- tools.relative_paths(self.paths, start if start else self.root))
+ @property
+ def failed(self) -> bool:
+ return self._failed
- def paths_by_repo(self) -> Dict[Path, List[Path]]:
- repos = collections.defaultdict(list)
+ def fail(
+ self,
+ description: str,
+ path: Optional[Path] = None,
+ line: Optional[int] = None,
+ ):
+ """Add a failure to this presubmit step.
+
+ If this is called at least once the step fails, but not immediately—the
+ check is free to continue and possibly call this method again.
+ """
+ _LOG.warning('%s', PresubmitFailure(description, path, line))
+ self._failed = True
+
+ @staticmethod
+ def create_for_testing():
+ parsed_env = pw_cli.env.pigweed_environment()
+ root = parsed_env.PW_PROJECT_ROOT
+ presubmit_root = root / 'out' / 'presubmit'
+ return PresubmitContext(
+ root=root,
+ repos=(root,),
+ output_dir=presubmit_root / 'test',
+ failure_summary_log=presubmit_root / 'failure-summary.log',
+ paths=(root / 'foo.cc', root / 'foo.py'),
+ all_paths=(root / 'BUILD.gn', root / 'foo.cc', root / 'foo.py'),
+ package_root=root / 'environment' / 'packages',
+ luci=None,
+ override_gn_args={},
+ format_options=FormatOptions(),
+ )
+
+
+class FileFilter:
+ """Allows checking if a path matches a series of filters.
+
+ Positive filters (e.g. the file name matches a regex) and negative filters
+ (path does not match a regular expression) may be applied.
+ """
+
+ _StrOrPattern = Union[Pattern, str]
+
+ def __init__(
+ self,
+ *,
+ exclude: Iterable[_StrOrPattern] = (),
+ endswith: Iterable[str] = (),
+ name: Iterable[_StrOrPattern] = (),
+ suffix: Iterable[str] = (),
+ ) -> None:
+ """Creates a FileFilter with the provided filters.
+
+ Args:
+ endswith: True if the end of the path is equal to any of the passed
+ strings
+ exclude: If any of the passed regular expresion match return False.
+ This overrides and other matches.
+ name: Regexs to match with file names(pathlib.Path.name). True if
+ the resulting regex matches the entire file name.
+ suffix: True if final suffix (as determined by pathlib.Path) is
+ matched by any of the passed str.
+ """
+ self.exclude = tuple(re.compile(i) for i in exclude)
+
+ self.endswith = tuple(endswith)
+ self.name = tuple(re.compile(i) for i in name)
+ self.suffix = tuple(suffix)
+
+ def matches(self, path: Union[str, Path]) -> bool:
+ """Returns true if the path matches any filter but not an exclude.
- for path in self.paths:
- repos[git_repo.root(path)].append(path)
+ If no positive filters are specified, any paths that do not match a
+ negative filter are considered to match.
+
+ If 'path' is a Path object it is rendered as a posix path (i.e.
+ using "/" as the path seperator) before testing with 'exclude' and
+ 'endswith'.
+ """
- return repos
+ posix_path = path.as_posix() if isinstance(path, Path) else path
+ if any(bool(exp.search(posix_path)) for exp in self.exclude):
+ return False
+ # If there are no positive filters set, accept all paths.
+ no_filters = not self.endswith and not self.name and not self.suffix
-class _Filter(NamedTuple):
- endswith: Tuple[str, ...] = ('', )
- exclude: Tuple[Pattern[str], ...] = ()
+ path_obj = Path(path)
+ return (
+ no_filters
+ or path_obj.suffix in self.suffix
+ or any(regex.fullmatch(path_obj.name) for regex in self.name)
+ or any(posix_path.endswith(end) for end in self.endswith)
+ )
- def matches(self, path: str) -> bool:
- return (any(path.endswith(end) for end in self.endswith)
- and not any(exp.search(path) for exp in self.exclude))
+ def filter(self, paths: Sequence[Union[str, Path]]) -> Sequence[Path]:
+ return [Path(x) for x in paths if self.matches(x)]
+
+ def apply_to_check(self, always_run: bool = False) -> Callable:
+ def wrapper(func: Callable) -> Check:
+ if isinstance(func, Check):
+ clone = copy.copy(func)
+ clone.filter = self
+ clone.always_run = clone.always_run or always_run
+ return clone
+
+ return Check(check=func, path_filter=self, always_run=always_run)
+
+ return wrapper
def _print_ui(*args) -> None:
@@ -212,29 +690,71 @@ def _print_ui(*args) -> None:
print(*args, flush=True)
+@dataclasses.dataclass
+class FilteredCheck:
+ check: Check
+ paths: Sequence[Path]
+ substep: Optional[str] = None
+
+ @property
+ def name(self) -> str:
+ return self.check.name
+
+ def run(self, ctx: PresubmitContext, count: int, total: int):
+ return self.check.run(ctx, count, total, self.substep)
+
+
class Presubmit:
"""Runs a series of presubmit checks on a list of files."""
- def __init__(self, root: Path, repos: Sequence[Path],
- output_directory: Path, paths: Sequence[Path],
- package_root: Path):
+
+ def __init__(
+ self,
+ root: Path,
+ repos: Sequence[Path],
+ output_directory: Path,
+ paths: Sequence[Path],
+ all_paths: Sequence[Path],
+ package_root: Path,
+ override_gn_args: Dict[str, str],
+ continue_after_build_error: bool,
+ ):
self._root = root.resolve()
self._repos = tuple(repos)
self._output_directory = output_directory.resolve()
self._paths = tuple(paths)
+ self._all_paths = tuple(all_paths)
self._relative_paths = tuple(
- tools.relative_paths(self._paths, self._root))
+ tools.relative_paths(self._paths, self._root)
+ )
self._package_root = package_root.resolve()
+ self._override_gn_args = override_gn_args
+ self._continue_after_build_error = continue_after_build_error
- def run(self, program: Program, keep_going: bool = False) -> bool:
+ def run(
+ self,
+ program: Program,
+ keep_going: bool = False,
+ substep: Optional[str] = None,
+ ) -> bool:
"""Executes a series of presubmit checks on the paths."""
checks = self.apply_filters(program)
+ if substep:
+ assert (
+ len(checks) == 1
+ ), 'substeps not supported with multiple steps'
+ checks[0].substep = substep
_LOG.debug('Running %s for %s', program.title(), self._root.name)
_print_ui(_title(f'{self._root.name}: {program.title()}'))
- _LOG.info('%d of %d checks apply to %s in %s', len(checks),
- len(program), plural(self._paths, 'file'), self._root)
+ _LOG.info(
+ '%d of %d checks apply to %s in %s',
+ len(checks),
+ len(program),
+ plural(self._paths, 'file'),
+ self._root,
+ )
_print_ui()
for line in tools.file_summary(self._relative_paths):
@@ -242,9 +762,9 @@ class Presubmit:
_print_ui()
if not self._paths:
- _print_ui(color_yellow('No files are being checked!'))
+ _print_ui(_COLOR.yellow('No files are being checked!'))
- _LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks))
+ _LOG.debug('Checks:\n%s', '\n'.join(c.name for c in checks))
start_time: float = time.time()
passed, failed, skipped = self._execute_checks(checks, keep_going)
@@ -252,22 +772,25 @@ class Presubmit:
return not failed and not skipped
- def apply_filters(
- self,
- program: Sequence[Callable]) -> List[Tuple[Check, Sequence[Path]]]:
- """Returns list of (check, paths) for checks that should run."""
+ def apply_filters(self, program: Sequence[Callable]) -> List[FilteredCheck]:
+ """Returns list of FilteredCheck for checks that should run."""
checks = [c if isinstance(c, Check) else Check(c) for c in program]
- filter_to_checks: Dict[_Filter,
- List[Check]] = collections.defaultdict(list)
+ filter_to_checks: Dict[
+ FileFilter, List[Check]
+ ] = collections.defaultdict(list)
- for check in checks:
- filter_to_checks[check.filter].append(check)
+ for chk in checks:
+ filter_to_checks[chk.filter].append(chk)
check_to_paths = self._map_checks_to_paths(filter_to_checks)
- return [(c, check_to_paths[c]) for c in checks if c in check_to_paths]
+ return [
+ FilteredCheck(c, check_to_paths[c])
+ for c in checks
+ if c in check_to_paths
+ ]
def _map_checks_to_paths(
- self, filter_to_checks: Dict[_Filter, List[Check]]
+ self, filter_to_checks: Dict[FileFilter, List[Check]]
) -> Dict[Check, Sequence[Path]]:
checks_to_paths: Dict[Check, Sequence[Path]] = {}
@@ -275,19 +798,22 @@ class Presubmit:
for filt, checks in filter_to_checks.items():
filtered_paths = tuple(
- path for path, filter_path in zip(self._paths, posix_paths)
- if filt.matches(filter_path))
+ path
+ for path, filter_path in zip(self._paths, posix_paths)
+ if filt.matches(filter_path)
+ )
- for check in checks:
- if filtered_paths or check.always_run:
- checks_to_paths[check] = filtered_paths
+ for chk in checks:
+ if filtered_paths or chk.always_run:
+ checks_to_paths[chk] = filtered_paths
else:
- _LOG.debug('Skipping "%s": no relevant files', check.name)
+ _LOG.debug('Skipping "%s": no relevant files', chk.name)
return checks_to_paths
- def _log_summary(self, time_s: float, passed: int, failed: int,
- skipped: int) -> None:
+ def _log_summary(
+ self, time_s: float, passed: int, failed: int, skipped: int
+ ) -> None:
summary_items = []
if passed:
summary_items.append(f'{passed} passed')
@@ -297,58 +823,85 @@ class Presubmit:
summary_items.append(f'{skipped} not run')
summary = ', '.join(summary_items) or 'nothing was done'
- result = _Result.FAIL if failed or skipped else _Result.PASS
+ if failed or skipped:
+ result = PresubmitResult.FAIL
+ else:
+ result = PresubmitResult.PASS
total = passed + failed + skipped
- _LOG.debug('Finished running %d checks on %s in %.1f s', total,
- plural(self._paths, 'file'), time_s)
+ _LOG.debug(
+ 'Finished running %d checks on %s in %.1f s',
+ total,
+ plural(self._paths, 'file'),
+ time_s,
+ )
_LOG.debug('Presubmit checks %s: %s', result.value, summary)
_print_ui(
_box(
- _SUMMARY_BOX, result.colorized(_LEFT, invert=True),
+ _SUMMARY_BOX,
+ result.colorized(_LEFT, invert=True),
f'{total} checks on {plural(self._paths, "file")}: {summary}',
- _format_time(time_s)))
+ _format_time(time_s),
+ )
+ )
+
+ def _create_presubmit_context( # pylint: disable=no-self-use
+ self, **kwargs
+ ):
+ """Create a PresubmitContext. Override if needed in subclasses."""
+ return PresubmitContext(**kwargs)
@contextlib.contextmanager
- def _context(self, name: str, paths: Tuple[Path, ...]):
+ def _context(self, filtered_check: FilteredCheck):
# There are many characters banned from filenames on Windows. To
# simplify things, just strip everything that's not a letter, digit,
# or underscore.
- sanitized_name = re.sub(r'[\W_]+', '_', name).lower()
+ sanitized_name = re.sub(r'[\W_]+', '_', filtered_check.name).lower()
output_directory = self._output_directory.joinpath(sanitized_name)
os.makedirs(output_directory, exist_ok=True)
- handler = logging.FileHandler(output_directory.joinpath('step.log'),
- mode='w')
+ failure_summary_log = output_directory / 'failure-summary.log'
+ failure_summary_log.unlink(missing_ok=True)
+
+ handler = logging.FileHandler(
+ output_directory.joinpath('step.log'), mode='w'
+ )
handler.setLevel(logging.DEBUG)
try:
_LOG.addHandler(handler)
- yield PresubmitContext(
+ yield self._create_presubmit_context(
root=self._root,
repos=self._repos,
output_dir=output_directory,
- paths=paths,
+ failure_summary_log=failure_summary_log,
+ paths=filtered_check.paths,
+ all_paths=self._all_paths,
package_root=self._package_root,
+ override_gn_args=self._override_gn_args,
+ continue_after_build_error=self._continue_after_build_error,
+ luci=LuciContext.create_from_environment(),
+ format_options=FormatOptions.load(),
)
finally:
_LOG.removeHandler(handler)
- def _execute_checks(self, program,
- keep_going: bool) -> Tuple[int, int, int]:
+ def _execute_checks(
+ self, program: List[FilteredCheck], keep_going: bool
+ ) -> Tuple[int, int, int]:
"""Runs presubmit checks; returns (passed, failed, skipped) lists."""
passed = failed = 0
- for i, (check, paths) in enumerate(program, 1):
- with self._context(check.name, paths) as ctx:
- result = check.run(ctx, i, len(program))
+ for i, filtered_check in enumerate(program, 1):
+ with self._context(filtered_check) as ctx:
+ result = filtered_check.run(ctx, i, len(program))
- if result is _Result.PASS:
+ if result is PresubmitResult.PASS:
passed += 1
- elif result is _Result.CANCEL:
+ elif result is PresubmitResult.CANCEL:
break
else:
failed += 1
@@ -358,8 +911,9 @@ class Presubmit:
return passed, failed, len(program) - passed - failed
-def _process_pathspecs(repos: Iterable[Path],
- pathspecs: Iterable[str]) -> Dict[Path, List[str]]:
+def _process_pathspecs(
+ repos: Iterable[Path], pathspecs: Iterable[str]
+) -> Dict[Path, List[str]]:
pathspecs_by_repo: Dict[Path, List[str]] = {repo: [] for repo in repos}
repos_with_paths: Set[Path] = set()
@@ -371,7 +925,8 @@ def _process_pathspecs(repos: Iterable[Path],
repo = git_repo.within_repo(pathspec)
if repo not in pathspecs_by_repo:
raise ValueError(
- f'{pathspec} is not in a Git repository in this presubmit')
+ f'{pathspec} is not in a Git repository in this presubmit'
+ )
# Make the path relative to the repo's root.
pathspecs_by_repo[repo].append(os.path.relpath(pathspec, repo))
@@ -389,16 +944,23 @@ def _process_pathspecs(repos: Iterable[Path],
return pathspecs_by_repo
-def run(program: Sequence[Callable],
- root: Path,
- repos: Collection[Path] = (),
- base: Optional[str] = None,
- paths: Sequence[str] = (),
- exclude: Sequence[Pattern] = (),
- output_directory: Optional[Path] = None,
- package_root: Path = None,
- only_list_steps: bool = False,
- keep_going: bool = False) -> bool:
+def run( # pylint: disable=too-many-arguments,too-many-locals
+ program: Sequence[Check],
+ root: Path,
+ repos: Collection[Path] = (),
+ base: Optional[str] = None,
+ paths: Sequence[str] = (),
+ exclude: Sequence[Pattern] = (),
+ output_directory: Optional[Path] = None,
+ package_root: Optional[Path] = None,
+ only_list_steps: bool = False,
+ override_gn_args: Sequence[Tuple[str, str]] = (),
+ keep_going: bool = False,
+ continue_after_build_error: bool = False,
+ presubmit_class: type = Presubmit,
+ list_steps_file: Optional[Path] = None,
+ substep: Optional[str] = None,
+) -> bool:
"""Lists files in the current Git repo and runs a Presubmit with them.
This changes the directory to the root of the Git repository after listing
@@ -422,30 +984,72 @@ def run(program: Sequence[Callable],
output_directory: where to place output files
package_root: where to place package files
only_list_steps: print step names instead of running them
- keep_going: whether to continue running checks if an error occurs
+ override_gn_args: additional GN args to set on steps
+ keep_going: continue running presubmit steps after a step fails
+ continue_after_build_error: continue building if a build step fails
+ presubmit_class: class to use to run Presubmits, should inherit from
+ Presubmit class above
+ list_steps_file: File created by --only-list-steps, used to keep from
+ recalculating affected files.
+ substep: run only part of a single check
Returns:
True if all presubmit checks succeeded
"""
repos = [repo.resolve() for repo in repos]
+ non_empty_repos = []
for repo in repos:
- if git_repo.root(repo) != repo:
- raise ValueError(f'{repo} is not the root of a Git repo; '
- 'presubmit checks must be run from a Git repo')
+ if list(repo.iterdir()):
+ non_empty_repos.append(repo)
+ if git_repo.root(repo) != repo:
+ raise ValueError(
+ f'{repo} is not the root of a Git repo; '
+ 'presubmit checks must be run from a Git repo'
+ )
+ repos = non_empty_repos
pathspecs_by_repo = _process_pathspecs(repos, paths)
- files: List[Path] = []
-
- for repo, pathspecs in pathspecs_by_repo.items():
- files += tools.exclude_paths(
- exclude, git_repo.list_files(base, pathspecs, repo), root)
-
+ all_files: List[Path] = []
+ modified_files: List[Path] = []
+ list_steps_data: Dict[str, Any] = {}
+
+ if list_steps_file:
+ with list_steps_file.open() as ins:
+ list_steps_data = json.load(ins)
+ all_files.extend(list_steps_data['all_files'])
+ for step in list_steps_data['steps']:
+ modified_files.extend(Path(x) for x in step.get("paths", ()))
+ modified_files = sorted(set(modified_files))
_LOG.info(
- 'Checking %s',
- git_repo.describe_files(repo, repo, base, pathspecs, exclude,
- root))
+ 'Loaded %d paths from file %s',
+ len(modified_files),
+ list_steps_file,
+ )
+
+ else:
+ for repo, pathspecs in pathspecs_by_repo.items():
+ all_files_repo = tuple(
+ tools.exclude_paths(
+ exclude, git_repo.list_files(None, pathspecs, repo), root
+ )
+ )
+ all_files += all_files_repo
+
+ if base is None:
+ modified_files += all_files_repo
+ else:
+ modified_files += tools.exclude_paths(
+ exclude, git_repo.list_files(base, pathspecs, repo), root
+ )
+
+ _LOG.info(
+ 'Checking %s',
+ git_repo.describe_files(
+ repo, repo, base, pathspecs, exclude, root
+ ),
+ )
if output_directory is None:
output_directory = root / '.presubmit'
@@ -453,111 +1057,300 @@ def run(program: Sequence[Callable],
if package_root is None:
package_root = output_directory / 'packages'
- presubmit = Presubmit(
+ presubmit = presubmit_class(
root=root,
repos=repos,
output_directory=output_directory,
- paths=files,
+ paths=modified_files,
+ all_paths=all_files,
package_root=package_root,
+ override_gn_args=dict(override_gn_args or {}),
+ continue_after_build_error=continue_after_build_error,
)
if only_list_steps:
- for check, _ in presubmit.apply_filters(program):
- print(check.name)
+ steps: List[Dict] = []
+ for filtered_check in presubmit.apply_filters(program):
+ step = {
+ 'name': filtered_check.name,
+ 'paths': [str(x) for x in filtered_check.paths],
+ }
+ substeps = filtered_check.check.substeps()
+ if len(substeps) > 1:
+ step['substeps'] = [x.name for x in substeps]
+ steps.append(step)
+
+ list_steps_data = {
+ 'steps': steps,
+ 'all_files': [str(x) for x in all_files],
+ }
+ json.dump(list_steps_data, sys.stdout, indent=2)
+ sys.stdout.write('\n')
return True
if not isinstance(program, Program):
program = Program('', program)
- return presubmit.run(program, keep_going)
+ return presubmit.run(program, keep_going, substep=substep)
def _make_str_tuple(value: Union[Iterable[str], str]) -> Tuple[str, ...]:
return tuple([value] if isinstance(value, str) else value)
+def check(*args, **kwargs):
+ """Turn a function into a presubmit check.
+
+ Args:
+ *args: Passed through to function.
+ *kwargs: Passed through to function.
+
+ If only one argument is provided and it's a function, this function acts
+ as a decorator and creates a Check from the function. Example of this kind
+ of usage:
+
+ @check
+ def pragma_once(ctx: PresubmitContext):
+ pass
+
+ Otherwise, save the arguments, and return a decorator that turns a function
+ into a Check, but with the arguments added onto the Check constructor.
+ Example of this kind of usage:
+
+ @check(name='pragma_twice')
+ def pragma_once(ctx: PresubmitContext):
+ pass
+ """
+ if (
+ len(args) == 1
+ and isinstance(args[0], types.FunctionType)
+ and not kwargs
+ ):
+ # Called as a regular decorator.
+ return Check(args[0])
+
+ def decorator(check_function):
+ return Check(check_function, *args, **kwargs)
+
+ return decorator
+
+
+@dataclasses.dataclass
+class SubStep:
+ name: Optional[str]
+ _func: Callable[..., PresubmitResult]
+ args: Sequence[Any] = ()
+ kwargs: Dict[str, Any] = dataclasses.field(default_factory=lambda: {})
+
+ def __call__(self, ctx: PresubmitContext) -> PresubmitResult:
+ if self.name:
+ _LOG.info('%s', self.name)
+ return self._func(ctx, *self.args, **self.kwargs)
+
+
class Check:
"""Wraps a presubmit check function.
This class consolidates the logic for running and logging a presubmit check.
It also supports filtering the paths passed to the presubmit check.
"""
- def __init__(self,
- check_function: Callable,
- path_filter: _Filter = _Filter(),
- always_run: bool = True):
- _ensure_is_valid_presubmit_check_function(check_function)
-
- self._check: Callable = check_function
- self.filter: _Filter = path_filter
- self.always_run: bool = always_run
+ def __init__(
+ self,
+ check: Union[ # pylint: disable=redefined-outer-name
+ Callable, Iterable[SubStep]
+ ],
+ path_filter: FileFilter = FileFilter(),
+ always_run: bool = True,
+ name: Optional[str] = None,
+ doc: Optional[str] = None,
+ ) -> None:
# Since Check wraps a presubmit function, adopt that function's name.
- self.__name__ = self._check.__name__
+ self.name: str = ''
+ self.doc: str = ''
+ if isinstance(check, Check):
+ self.name = check.name
+ self.doc = check.doc
+ elif callable(check):
+ self.name = check.__name__
+ self.doc = check.__doc__ or ''
+
+ if name:
+ self.name = name
+ if doc:
+ self.doc = doc
+
+ if not self.name:
+ raise ValueError('no name for step')
+
+ self._substeps_raw: Iterable[SubStep]
+ if isinstance(check, collections.abc.Iterator):
+ self._substeps_raw = check
+ else:
+ assert callable(check)
+ _ensure_is_valid_presubmit_check_function(check)
+ self._substeps_raw = iter((SubStep(None, check),))
+ self._substeps_saved: Sequence[SubStep] = ()
+
+ self.filter = path_filter
+ self.always_run: bool = always_run
+
+ def substeps(self) -> Sequence[SubStep]:
+ """Return the SubSteps of the current step.
+
+ This is where the list of SubSteps is actually evaluated. It can't be
+ evaluated in the constructor because the Iterable passed into the
+ constructor might not be ready yet.
+ """
+ if not self._substeps_saved:
+ self._substeps_saved = tuple(self._substeps_raw)
+ return self._substeps_saved
+
+ def __repr__(self):
+ # This returns just the name so it's easy to show the entire list of
+ # steps with '--help'.
+ return self.name
+
+ def unfiltered(self) -> Check:
+ """Create a new check identical to this one, but without the filter."""
+ clone = copy.copy(self)
+ clone.filter = FileFilter()
+ return clone
def with_filter(
self,
*,
- endswith: Iterable[str] = '',
- exclude: Iterable[Union[Pattern[str], str]] = ()
+ endswith: Iterable[str] = (),
+ exclude: Iterable[Union[Pattern[str], str]] = (),
) -> Check:
- endswith = self.filter.endswith
- if endswith:
- endswith = endswith + _make_str_tuple(endswith)
- exclude = self.filter.exclude + tuple(re.compile(e) for e in exclude)
+ """Create a new check identical to this one, but with extra filters.
- return Check(check_function=self._check,
- path_filter=_Filter(endswith=endswith, exclude=exclude),
- always_run=self.always_run)
+ Add to the existing filter, perhaps to exclude an additional directory.
- @property
- def name(self):
- return self.__name__
+ Args:
+ endswith: Passed through to FileFilter.
+ exclude: Passed through to FileFilter.
+
+ Returns a new check.
+ """
+ return self.with_file_filter(
+ FileFilter(endswith=_make_str_tuple(endswith), exclude=exclude)
+ )
+
+ def with_file_filter(self, file_filter: FileFilter) -> Check:
+ """Create a new check identical to this one, but with extra filters.
+
+ Add to the existing filter, perhaps to exclude an additional directory.
+
+ Args:
+ file_filter: Additional filter rules.
- def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result:
+ Returns a new check.
+ """
+ clone = copy.copy(self)
+ if clone.filter:
+ clone.filter.exclude = clone.filter.exclude + file_filter.exclude
+ clone.filter.endswith = clone.filter.endswith + file_filter.endswith
+ clone.filter.name = file_filter.name or clone.filter.name
+ clone.filter.suffix = clone.filter.suffix + file_filter.suffix
+ else:
+ clone.filter = file_filter
+ return clone
+
+ def run(
+ self,
+ ctx: PresubmitContext,
+ count: int,
+ total: int,
+ substep: Optional[str] = None,
+ ) -> PresubmitResult:
"""Runs the presubmit check on the provided paths."""
_print_ui(
- _box(_CHECK_UPPER, f'{count}/{total}', self.name,
- plural(ctx.paths, "file")))
-
- _LOG.debug('[%d/%d] Running %s on %s', count, total, self.name,
- plural(ctx.paths, "file"))
+ _box(
+ _CHECK_UPPER,
+ f'{count}/{total}',
+ self.name,
+ plural(ctx.paths, "file"),
+ )
+ )
+
+ substep_part = f'.{substep}' if substep else ''
+ _LOG.debug(
+ '[%d/%d] Running %s%s on %s',
+ count,
+ total,
+ self.name,
+ substep_part,
+ plural(ctx.paths, "file"),
+ )
start_time_s = time.time()
- result = self._call_function(ctx)
+ result: PresubmitResult
+ if substep:
+ result = self.run_substep(ctx, substep)
+ else:
+ result = self(ctx)
time_str = _format_time(time.time() - start_time_s)
_LOG.debug('%s %s', self.name, result.value)
_print_ui(
- _box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str))
+ _box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str)
+ )
_LOG.debug('%s duration:%s', self.name, time_str)
return result
- def _call_function(self, ctx: PresubmitContext) -> _Result:
+ def _try_call(
+ self,
+ func: Callable,
+ ctx,
+ *args,
+ **kwargs,
+ ) -> PresubmitResult:
try:
- self._check(ctx)
+ result = func(ctx, *args, **kwargs)
+ if ctx.failed:
+ return PresubmitResult.FAIL
+ if isinstance(result, PresubmitResult):
+ return result
+ return PresubmitResult.PASS
+
except PresubmitFailure as failure:
if str(failure):
_LOG.warning('%s', failure)
- return _Result.FAIL
- except Exception as failure: # pylint: disable=broad-except
+ return PresubmitResult.FAIL
+
+ except Exception as _failure: # pylint: disable=broad-except
_LOG.exception('Presubmit check %s failed!', self.name)
- return _Result.FAIL
+ return PresubmitResult.FAIL
+
except KeyboardInterrupt:
_print_ui()
- return _Result.CANCEL
+ return PresubmitResult.CANCEL
+
+ def run_substep(
+ self, ctx: PresubmitContext, name: Optional[str]
+ ) -> PresubmitResult:
+ for substep in self.substeps():
+ if substep.name == name:
+ return substep(ctx)
- return _Result.PASS
+ expected = ', '.join(repr(s.name) for s in self.substeps())
+ raise LookupError(f'bad substep name: {name!r} (expected: {expected})')
- def __call__(self, ctx: PresubmitContext, *args, **kwargs):
- """Calling a Check calls its underlying function directly.
+ def __call__(self, ctx: PresubmitContext) -> PresubmitResult:
+ """Calling a Check calls its underlying substeps directly.
This makes it possible to call functions wrapped by @filter_paths. The
prior filters are ignored, so new filters may be applied.
"""
- return self._check(ctx, *args, **kwargs)
+ result: PresubmitResult
+ for substep in self.substeps():
+ result = self._try_call(substep, ctx)
+ if result and result != PresubmitResult.PASS:
+ return result
+ return PresubmitResult.PASS
def _required_args(function: Callable) -> Iterable[Parameter]:
@@ -569,26 +1362,36 @@ def _required_args(function: Callable) -> Iterable[Parameter]:
yield param
-def _ensure_is_valid_presubmit_check_function(check: Callable) -> None:
+def _ensure_is_valid_presubmit_check_function(chk: Callable) -> None:
"""Checks if a Callable can be used as a presubmit check."""
try:
- required_args = tuple(_required_args(check))
+ required_args = tuple(_required_args(chk))
except (TypeError, ValueError):
- raise TypeError('Presubmit checks must be callable, but '
- f'{check!r} is a {type(check).__name__}')
+ raise TypeError(
+ 'Presubmit checks must be callable, but '
+ f'{chk!r} is a {type(chk).__name__}'
+ )
if len(required_args) != 1:
raise TypeError(
f'Presubmit check functions must have exactly one required '
f'positional argument (the PresubmitContext), but '
- f'{check.__name__} has {len(required_args)} required arguments' +
- (f' ({", ".join(a.name for a in required_args)})'
- if required_args else ''))
+ f'{chk.__name__} has {len(required_args)} required arguments'
+ + (
+ f' ({", ".join(a.name for a in required_args)})'
+ if required_args
+ else ''
+ )
+ )
-def filter_paths(endswith: Iterable[str] = '',
- exclude: Iterable[Union[Pattern[str], str]] = (),
- always_run: bool = False) -> Callable[[Callable], Check]:
+def filter_paths(
+ *,
+ endswith: Iterable[str] = (),
+ exclude: Iterable[Union[Pattern[str], str]] = (),
+ file_filter: Optional[FileFilter] = None,
+ always_run: bool = False,
+) -> Callable[[Callable], Check]:
"""Decorator for filtering the paths list for a presubmit check function.
Path filters only apply when the function is used as a presubmit check.
@@ -599,15 +1402,27 @@ def filter_paths(endswith: Iterable[str] = '',
Args:
endswith: str or iterable of path endings to include
exclude: regular expressions of paths to exclude
-
+ file_filter: FileFilter used to select files
+ always_run: Run check even when no files match
Returns:
a wrapped version of the presubmit function
"""
+
+ if file_filter:
+ real_file_filter = file_filter
+ if endswith or exclude:
+ raise ValueError(
+ 'Must specify either file_filter or '
+ 'endswith/exclude args, not both'
+ )
+ else:
+ # TODO(b/238426363): Remove these arguments and use FileFilter only.
+ real_file_filter = FileFilter(
+ endswith=_make_str_tuple(endswith), exclude=exclude
+ )
+
def filter_paths_for_function(function: Callable):
- return Check(function,
- _Filter(_make_str_tuple(endswith),
- tuple(re.compile(e) for e in exclude)),
- always_run=always_run)
+ return Check(function, real_file_filter, always_run=always_run)
return filter_paths_for_function
@@ -617,6 +1432,9 @@ def call(*args, **kwargs) -> None:
attributes, command = tools.format_command(args, kwargs)
_LOG.debug('[RUN] %s\n%s', attributes, command)
+ tee = kwargs.pop('tee', None)
+ propagate_sigterm = kwargs.pop('propagate_sigterm', False)
+
env = pw_cli.env.pigweed_environment()
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.STDOUT
@@ -624,21 +1442,64 @@ def call(*args, **kwargs) -> None:
process = subprocess.Popen(args, **kwargs)
assert process.stdout
+ # Set up signal handler if requested.
+ signaled = False
+ if propagate_sigterm:
+
+ def signal_handler(_signal_number: int, _stack_frame: Any) -> None:
+ nonlocal signaled
+ signaled = True
+ process.terminate()
+
+ previous_signal_handler = signal.signal(signal.SIGTERM, signal_handler)
+
if env.PW_PRESUBMIT_DISABLE_SUBPROCESS_CAPTURE:
while True:
line = process.stdout.readline().decode(errors='backslashreplace')
if not line:
break
_LOG.info(line.rstrip())
+ if tee:
+ tee.write(line)
stdout, _ = process.communicate()
+ if tee:
+ tee.write(stdout.decode(errors='backslashreplace'))
logfunc = _LOG.warning if process.returncode else _LOG.debug
logfunc('[FINISHED]\n%s', command)
- logfunc('[RESULT] %s with return code %d',
- 'Failed' if process.returncode else 'Passed', process.returncode)
+ logfunc(
+ '[RESULT] %s with return code %d',
+ 'Failed' if process.returncode else 'Passed',
+ process.returncode,
+ )
if stdout:
logfunc('[OUTPUT]\n%s', stdout.decode(errors='backslashreplace'))
+ if propagate_sigterm:
+ signal.signal(signal.SIGTERM, previous_signal_handler)
+ if signaled:
+ _LOG.warning('Exiting due to SIGTERM.')
+ sys.exit(1)
+
if process.returncode:
raise PresubmitFailure
+
+
+def install_package(
+ ctx: Union[FormatContext, PresubmitContext],
+ name: str,
+ force: bool = False,
+) -> None:
+ """Install package with given name in given path."""
+ root = ctx.package_root
+ mgr = package_manager.PackageManager(root)
+
+ if not mgr.list():
+ raise PresubmitFailure(
+ 'no packages configured, please import your pw_package '
+ 'configuration module'
+ )
+
+ if not mgr.status(name) or force:
+ mgr.install(name, force=force)