Source code for flake8.utils

"""Utility methods for flake8."""
import collections
import fnmatch as _fnmatch
import inspect
import io
import os
import platform
import re
import sys
import tokenize
from typing import Callable, Dict, Generator  # noqa: F401 (until flake8 3.7)
from typing import List, Pattern, Sequence  # noqa: F401 (until flake8 3,7)
from typing import Tuple, TYPE_CHECKING  # noqa: F401 (until flake8 3.7)
from typing import Union  # noqa: F401 (until flake8 3.7)

if TYPE_CHECKING:
    from flake8.plugins.manager import Plugin  # noqa: F401 (until flake8 3.7)

DIFF_HUNK_REGEXP = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$")
COMMA_SEPARATED_LIST_RE = re.compile(r"[,\s]")
LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]")


[docs]def parse_comma_separated_list(value, regexp=COMMA_SEPARATED_LIST_RE): # type: (Union[Sequence[str], str], Pattern[str]) -> List[str] """Parse a comma-separated list. :param value: String or list of strings to be parsed and normalized. :param regexp: Compiled regular expression used to split the value when it is a string. :type regexp: _sre.SRE_Pattern :returns: List of values with whitespace stripped. :rtype: list """ if not value: return [] if not isinstance(value, (list, tuple)): value = regexp.split(value) item_gen = (item.strip() for item in value) return [item for item in item_gen if item]
_Token = collections.namedtuple("Token", ("tp", "src")) _CODE, _FILE, _COLON, _COMMA, _WS = "code", "file", "colon", "comma", "ws" _EOF = "eof" _FILE_LIST_TOKEN_TYPES = [ (re.compile(r"[A-Z][0-9]*(?=$|\s|,)"), _CODE), (re.compile(r"[^\s:,]+"), _FILE), (re.compile(r"\s*:\s*"), _COLON), (re.compile(r"\s*,\s*"), _COMMA), (re.compile(r"\s+"), _WS), ] def _tokenize_files_to_codes_mapping(value): # type: (str) -> List[_Token] tokens = [] i = 0 while i < len(value): for token_re, token_name in _FILE_LIST_TOKEN_TYPES: match = token_re.match(value, i) if match: tokens.append(_Token(token_name, match.group().strip())) i = match.end() break else: raise AssertionError("unreachable", value, i) tokens.append(_Token(_EOF, "")) return tokens def parse_files_to_codes_mapping(value): # noqa: C901 # type: (Union[Sequence[str], str]) -> List[Tuple[List[str], List[str]]] """Parse a files-to-codes maping. A files-to-codes mapping a sequence of values specified as `filenames list:codes list ...`. Each of the lists may be separated by either comma or whitespace tokens. :param value: String to be parsed and normalized. :type value: str """ if isinstance(value, (list, tuple)): value = "\n".join(value) ret = [] if not value.strip(): return ret class State: seen_sep = True seen_colon = False filenames = [] codes = [] def _reset(): if State.codes: for filename in State.filenames: ret.append((filename, State.codes)) State.seen_sep = True State.seen_colon = False State.filenames = [] State.codes = [] for token in _tokenize_files_to_codes_mapping(value): # legal in any state: separator sets the sep bit if token.tp in {_COMMA, _WS}: State.seen_sep = True # looking for filenames elif not State.seen_colon: if token.tp == _COLON: State.seen_colon = True State.seen_sep = True elif State.seen_sep and token.tp == _FILE: State.filenames.append(token.src) State.seen_sep = False else: raise ValueError("Unexpected token: {}".format(token)) # looking for codes else: if token.tp == _EOF: _reset() elif State.seen_sep and token.tp == _CODE: State.codes.append(token.src) State.seen_sep = False elif State.seen_sep and token.tp == _FILE: _reset() State.filenames.append(token.src) State.seen_sep = False else: raise ValueError("Unexpected token: {}".format(token)) return ret
[docs]def normalize_paths(paths, parent=os.curdir): # type: (Union[Sequence[str], str], str) -> List[str] """Parse a comma-separated list of paths. :returns: The normalized paths. :rtype: [str] """ return [ normalize_path(p, parent) for p in parse_comma_separated_list(paths) ]
[docs]def normalize_path(path, parent=os.curdir): # type: (str, str) -> str """Normalize a single-path. :returns: The normalized path. :rtype: str """ # NOTE(sigmavirus24): Using os.path.sep and os.path.altsep allow for # Windows compatibility with both Windows-style paths (c:\\foo\bar) and # Unix style paths (/foo/bar). separator = os.path.sep # NOTE(sigmavirus24): os.path.altsep may be None alternate_separator = os.path.altsep or "" if separator in path or ( alternate_separator and alternate_separator in path ): path = os.path.abspath(os.path.join(parent, path)) return path.rstrip(separator + alternate_separator)
def _stdin_get_value_py3(): stdin_value = sys.stdin.buffer.read() fd = io.BytesIO(stdin_value) try: (coding, lines) = tokenize.detect_encoding(fd.readline) return io.StringIO(stdin_value.decode(coding)) except (LookupError, SyntaxError, UnicodeError): return io.StringIO(stdin_value.decode("utf-8"))
[docs]def stdin_get_value(): # type: () -> str """Get and cache it so plugins can use it.""" cached_value = getattr(stdin_get_value, "cached_stdin", None) if cached_value is None: if sys.version_info < (3, 0): stdin_value = io.BytesIO(sys.stdin.read()) else: stdin_value = _stdin_get_value_py3() stdin_get_value.cached_stdin = stdin_value cached_value = stdin_get_value.cached_stdin return cached_value.getvalue()
[docs]def parse_unified_diff(diff=None): # type: (str) -> List[str] """Parse the unified diff passed on stdin. :returns: dictionary mapping file names to sets of line numbers :rtype: dict """ # Allow us to not have to patch out stdin_get_value if diff is None: diff = stdin_get_value() number_of_rows = None current_path = None parsed_paths = collections.defaultdict(set) for line in diff.splitlines(): if number_of_rows: # NOTE(sigmavirus24): Below we use a slice because stdin may be # bytes instead of text on Python 3. if line[:1] != "-": number_of_rows -= 1 # We're in the part of the diff that has lines starting with +, -, # and ' ' to show context and the changes made. We skip these # because the information we care about is the filename and the # range within it. # When number_of_rows reaches 0, we will once again start # searching for filenames and ranges. continue # NOTE(sigmavirus24): Diffs that we support look roughly like: # diff a/file.py b/file.py # ... # --- a/file.py # +++ b/file.py # Below we're looking for that last line. Every diff tool that # gives us this output may have additional information after # ``b/file.py`` which it will separate with a \t, e.g., # +++ b/file.py\t100644 # Which is an example that has the new file permissions/mode. # In this case we only care about the file name. if line[:3] == "+++": current_path = line[4:].split("\t", 1)[0] # NOTE(sigmavirus24): This check is for diff output from git. if current_path[:2] == "b/": current_path = current_path[2:] # We don't need to do anything else. We have set up our local # ``current_path`` variable. We can skip the rest of this loop. # The next line we will see will give us the hung information # which is in the next section of logic. continue hunk_match = DIFF_HUNK_REGEXP.match(line) # NOTE(sigmavirus24): pep8/pycodestyle check for: # line[:3] == '@@ ' # But the DIFF_HUNK_REGEXP enforces that the line start with that # So we can more simply check for a match instead of slicing and # comparing. if hunk_match: (row, number_of_rows) = [ 1 if not group else int(group) for group in hunk_match.groups() ] parsed_paths[current_path].update( range(row, row + number_of_rows) ) # We have now parsed our diff into a dictionary that looks like: # {'file.py': set(range(10, 16), range(18, 20)), ...} return parsed_paths
[docs]def is_windows(): # type: () -> bool """Determine if we're running on Windows. :returns: True if running on Windows, otherwise False :rtype: bool """ return os.name == "nt"
# NOTE(sigmavirus24): If and when https://bugs.python.org/issue27649 is fixed, # re-enable multiprocessing support on Windows.
[docs]def can_run_multiprocessing_on_windows(): # type: () -> bool """Determine if we can use multiprocessing on Windows. This presently will **always** return False due to a `bug`_ in the :mod:`multiprocessing` module on Windows. Once fixed, we will check to ensure that the version of Python contains that fix (via version inspection) and *conditionally* re-enable support on Windows. .. _bug: https://bugs.python.org/issue27649 :returns: True if the version of Python is modern enough, otherwise False :rtype: bool """ is_new_enough_python27 = (2, 7, 11) <= sys.version_info < (3, 0) is_new_enough_python3 = sys.version_info > (3, 2) return False and (is_new_enough_python27 or is_new_enough_python3)
[docs]def is_using_stdin(paths): # type: (List[str]) -> bool """Determine if we're going to read from stdin. :param list paths: The paths that we're going to check. :returns: True if stdin (-) is in the path, otherwise False :rtype: bool """ return "-" in paths
def _default_predicate(*args): return False
[docs]def filenames_from(arg, predicate=None): # type: (str, Callable[[str], bool]) -> Generator """Generate filenames from an argument. :param str arg: Parameter from the command-line. :param callable predicate: Predicate to use to filter out filenames. If the predicate returns ``True`` we will exclude the filename, otherwise we will yield it. By default, we include every filename generated. :returns: Generator of paths """ if predicate is None: predicate = _default_predicate if predicate(arg): return if os.path.isdir(arg): for root, sub_directories, files in os.walk(arg): if predicate(root): sub_directories[:] = [] continue # NOTE(sigmavirus24): os.walk() will skip a directory if you # remove it from the list of sub-directories. for directory in sub_directories: joined = os.path.join(root, directory) if predicate(joined): sub_directories.remove(directory) for filename in files: joined = os.path.join(root, filename) if predicate(joined) or predicate(filename): continue yield joined else: yield arg
[docs]def fnmatch(filename, patterns, default=True): # type: (str, List[str], bool) -> bool """Wrap :func:`fnmatch.fnmatch` to add some functionality. :param str filename: Name of the file we're trying to match. :param list patterns: Patterns we're using to try to match the filename. :param bool default: The default value if patterns is empty :returns: True if a pattern matches the filename, False if it doesn't. ``default`` if patterns is empty. """ if not patterns: return default return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns)
[docs]def parameters_for(plugin): # type: (Plugin) -> Dict[str, bool] """Return the parameters for the plugin. This will inspect the plugin and return either the function parameters if the plugin is a function or the parameters for ``__init__`` after ``self`` if the plugin is a class. :param plugin: The internal plugin object. :type plugin: flake8.plugins.manager.Plugin :returns: A dictionary mapping the parameter name to whether or not it is required (a.k.a., is positional only/does not have a default). :rtype: dict([(str, bool)]) """ func = plugin.plugin is_class = not inspect.isfunction(func) if is_class: # The plugin is a class func = plugin.plugin.__init__ if sys.version_info < (3, 3): argspec = inspect.getargspec(func) start_of_optional_args = len(argspec[0]) - len(argspec[-1] or []) parameter_names = argspec[0] parameters = collections.OrderedDict( [ (name, position < start_of_optional_args) for position, name in enumerate(parameter_names) ] ) else: parameters = collections.OrderedDict( [ (parameter.name, parameter.default is parameter.empty) for parameter in inspect.signature(func).parameters.values() if parameter.kind == parameter.POSITIONAL_OR_KEYWORD ] ) if is_class: parameters.pop("self", None) return parameters
def matches_filename(path, patterns, log_message, logger): """Use fnmatch to discern if a path exists in patterns. :param str path: The path to the file under question :param patterns: The patterns to match the path against. :type patterns: list[str] :param str log_message: The message used for logging purposes. :returns: True if path matches patterns, False otherwise :rtype: bool """ if not patterns: return False basename = os.path.basename(path) if fnmatch(basename, patterns): logger.debug(log_message, {"path": basename, "whether": ""}) return True absolute_path = os.path.abspath(path) match = fnmatch(absolute_path, patterns) logger.debug( log_message, {"path": absolute_path, "whether": "" if match else "not "}, ) return match def get_python_version(): """Find and format the python implementation and version. :returns: Implementation name, version, and platform as a string. :rtype: str """ return "%s %s on %s" % ( platform.python_implementation(), platform.python_version(), platform.system(), )