Source code for flake8.checker

"""Checker Manager and Checker classes."""
import collections
import errno
import logging
import signal
import sys
import tokenize
from typing import List, Optional  # noqa: F401 (until flake8 3.7)

try:
    import multiprocessing
except ImportError:
    multiprocessing = None

from flake8 import defaults
from flake8 import exceptions
from flake8 import processor
from flake8 import utils

LOG = logging.getLogger(__name__)

SERIAL_RETRY_ERRNOS = {
    # ENOSPC: Added by sigmavirus24
    # > On some operating systems (OSX), multiprocessing may cause an
    # > ENOSPC error while trying to trying to create a Semaphore.
    # > In those cases, we should replace the customized Queue Report
    # > class with pep8's StandardReport class to ensure users don't run
    # > into this problem.
    # > (See also: https://gitlab.com/pycqa/flake8/issues/74)
    errno.ENOSPC,
    # NOTE(sigmavirus24): When adding to this list, include the reasoning
    # on the lines before the error code and always append your error
    # code. Further, please always add a trailing `,` to reduce the visual
    # noise in diffs.
}


[docs]class Manager(object): """Manage the parallelism and checker instances for each plugin and file. This class will be responsible for the following: - Determining the parallelism of Flake8, e.g.: * Do we use :mod:`multiprocessing` or is it unavailable? * Do we automatically decide on the number of jobs to use or did the user provide that? - Falling back to a serial way of processing files if we run into an OSError related to :mod:`multiprocessing` - Organizing the results of each checker so we can group the output together and make our output deterministic. """ def __init__(self, style_guide, arguments, checker_plugins): """Initialize our Manager instance. :param style_guide: The instantiated style guide for this instance of Flake8. :type style_guide: flake8.style_guide.StyleGuide :param list arguments: The extra arguments parsed from the CLI (if any) :param checker_plugins: The plugins representing checks parsed from entry-points. :type checker_plugins: flake8.plugins.manager.Checkers """ self.arguments = arguments self.style_guide = style_guide self.options = style_guide.options self.checks = checker_plugins self.jobs = self._job_count() self.using_multiprocessing = self.jobs > 1 self.processes = [] self.checkers = [] self.statistics = { "files": 0, "logical lines": 0, "physical lines": 0, "tokens": 0, } def _process_statistics(self): for checker in self.checkers: for statistic in defaults.STATISTIC_NAMES: self.statistics[statistic] += checker.statistics[statistic] self.statistics["files"] += len(self.checkers) def _job_count(self): # type: () -> int # First we walk through all of our error cases: # - multiprocessing library is not present # - we're running on windows in which case we know we have significant # implemenation issues # - the user provided stdin and that's not something we can handle # well # - we're processing a diff, which again does not work well with # multiprocessing and which really shouldn't require multiprocessing # - the user provided some awful input if not multiprocessing: LOG.warning( "The multiprocessing module is not available. " "Ignoring --jobs arguments." ) return 0 if ( utils.is_windows() and not utils.can_run_multiprocessing_on_windows() ): LOG.warning( "The --jobs option is not available on Windows due to" " a bug (https://bugs.python.org/issue27649) in " "Python 2.7.11+ and 3.3+. We have detected that you " "are running an unsupported version of Python on " "Windows. Ignoring --jobs arguments." ) return 0 if utils.is_using_stdin(self.arguments): LOG.warning( "The --jobs option is not compatible with supplying " "input using - . Ignoring --jobs arguments." ) return 0 if self.options.diff: LOG.warning( "The --diff option was specified with --jobs but " "they are not compatible. Ignoring --jobs arguments." ) return 0 jobs = self.options.jobs if jobs != "auto" and not jobs.isdigit(): LOG.warning( '"%s" is not a valid parameter to --jobs. Must be one ' 'of "auto" or a numerical value, e.g., 4.', jobs, ) return 0 # If the value is "auto", we want to let the multiprocessing library # decide the number based on the number of CPUs. However, if that # function is not implemented for this particular value of Python we # default to 1 if jobs == "auto": try: return multiprocessing.cpu_count() except NotImplementedError: return 0 # Otherwise, we know jobs should be an integer and we can just convert # it to an integer return int(jobs) def _handle_results(self, filename, results): style_guide = self.style_guide reported_results_count = 0 for (error_code, line_number, column, text, physical_line) in results: reported_results_count += style_guide.handle_error( code=error_code, filename=filename, line_number=line_number, column_number=column, text=text, physical_line=physical_line, ) return reported_results_count
[docs] def is_path_excluded(self, path): # type: (str) -> bool """Check if a path is excluded. :param str path: Path to check against the exclude patterns. :returns: True if there are exclude patterns and the path matches, otherwise False. :rtype: bool """ if path == "-": if self.options.stdin_display_name == "stdin": return False path = self.options.stdin_display_name return utils.matches_filename( path, patterns=self.options.exclude, log_message='"%(path)s" has %(whether)sbeen excluded', logger=LOG, )
[docs] def make_checkers(self, paths=None): # type: (List[str]) -> None """Create checkers for each file.""" if paths is None: paths = self.arguments if not paths: paths = ["."] filename_patterns = self.options.filename running_from_vcs = self.options._running_from_vcs running_from_diff = self.options.diff # NOTE(sigmavirus24): Yes this is a little unsightly, but it's our # best solution right now. def should_create_file_checker(filename, argument): """Determine if we should create a file checker.""" matches_filename_patterns = utils.fnmatch( filename, filename_patterns ) is_stdin = filename == "-" # NOTE(sigmavirus24): If a user explicitly specifies something, # e.g, ``flake8 bin/script`` then we should run Flake8 against # that. Since should_create_file_checker looks to see if the # filename patterns match the filename, we want to skip that in # the event that the argument and the filename are identical. # If it was specified explicitly, the user intended for it to be # checked. explicitly_provided = ( not running_from_vcs and not running_from_diff and (argument == filename) ) return ( explicitly_provided or matches_filename_patterns ) or is_stdin checks = self.checks.to_dictionary() checkers = ( FileChecker(filename, checks, self.options) for argument in paths for filename in utils.filenames_from( argument, self.is_path_excluded ) if should_create_file_checker(filename, argument) ) self.checkers = [ checker for checker in checkers if checker.should_process ] LOG.info("Checking %d files", len(self.checkers))
[docs] def report(self): # type: () -> (int, int) """Report all of the errors found in the managed file checkers. This iterates over each of the checkers and reports the errors sorted by line number. :returns: A tuple of the total results found and the results reported. :rtype: tuple(int, int) """ results_reported = results_found = 0 for checker in self.checkers: results = sorted( checker.results, key=lambda tup: (tup[1], tup[2]) ) filename = checker.display_name with self.style_guide.processing_file(filename): results_reported += self._handle_results(filename, results) results_found += len(results) return (results_found, results_reported)
[docs] def run_parallel(self): """Run the checkers in parallel.""" final_results = collections.defaultdict(list) final_statistics = collections.defaultdict(dict) try: pool = multiprocessing.Pool(self.jobs, _pool_init) except OSError as oserr: if oserr.errno not in SERIAL_RETRY_ERRNOS: raise self.using_multiprocessing = False self.run_serial() return try: pool_map = pool.imap_unordered( _run_checks, self.checkers, chunksize=calculate_pool_chunksize( len(self.checkers), self.jobs ), ) for ret in pool_map: filename, results, statistics = ret final_results[filename] = results final_statistics[filename] = statistics pool.close() pool.join() pool = None finally: if pool is not None: pool.terminate() pool.join() for checker in self.checkers: filename = checker.display_name checker.results = sorted( final_results[filename], key=lambda tup: (tup[2], tup[2]) ) checker.statistics = final_statistics[filename]
[docs] def run_serial(self): """Run the checkers in serial.""" for checker in self.checkers: checker.run_checks()
[docs] def run(self): """Run all the checkers. This will intelligently decide whether to run the checks in parallel or whether to run them in serial. If running the checks in parallel causes a problem (e.g., https://gitlab.com/pycqa/flake8/issues/74) this also implements fallback to serial processing. """ try: if self.using_multiprocessing: self.run_parallel() else: self.run_serial() except OSError as oserr: if oserr.errno not in SERIAL_RETRY_ERRNOS: LOG.exception(oserr) raise LOG.warning("Running in serial after OS exception, %r", oserr) self.run_serial() except KeyboardInterrupt: LOG.warning("Flake8 was interrupted by the user") raise exceptions.EarlyQuit("Early quit while running checks")
[docs] def start(self, paths=None): """Start checking files. :param list paths: Path names to check. This is passed directly to :meth:`~Manager.make_checkers`. """ LOG.info("Making checkers") self.make_checkers(paths)
[docs] def stop(self): """Stop checking files.""" self._process_statistics() for proc in self.processes: LOG.info("Joining %s to the main process", proc.name) proc.join()
[docs]class FileChecker(object): """Manage running checks for a file and aggregate the results.""" def __init__(self, filename, checks, options): """Initialize our file checker. :param str filename: Name of the file to check. :param checks: The plugins registered to check the file. :type checks: dict :param options: Parsed option values from config and command-line. :type options: optparse.Values """ self.options = options self.filename = filename self.checks = checks self.results = [] self.statistics = { "tokens": 0, "logical lines": 0, "physical lines": 0, } self.processor = self._make_processor() self.display_name = filename self.should_process = False if self.processor is not None: self.display_name = self.processor.filename self.should_process = not self.processor.should_ignore_file() self.statistics["physical lines"] = len(self.processor.lines) def __repr__(self): """Provide helpful debugging representation.""" return "FileChecker for {}".format(self.filename) def _make_processor(self): try: return processor.FileProcessor(self.filename, self.options) except IOError: # If we can not read the file due to an IOError (e.g., the file # does not exist or we do not have the permissions to open it) # then we need to format that exception for the user. # NOTE(sigmavirus24): Historically, pep8 has always reported this # as an E902. We probably *want* a better error code for this # going forward. (exc_type, exception) = sys.exc_info()[:2] message = "{0}: {1}".format(exc_type.__name__, exception) self.report("E902", 0, 0, message) return None
[docs] def report(self, error_code, line_number, column, text, line=None): # type: (str, int, int, str, Optional[str]) -> str """Report an error by storing it in the results list.""" if error_code is None: error_code, text = text.split(" ", 1) physical_line = line # If we're recovering from a problem in _make_processor, we will not # have this attribute. if not physical_line and getattr(self, "processor", None): physical_line = self.processor.line_for(line_number) error = (error_code, line_number, column, text, physical_line) self.results.append(error) return error_code
[docs] def run_check(self, plugin, **arguments): """Run the check in a single plugin.""" LOG.debug("Running %r with %r", plugin, arguments) try: self.processor.keyword_arguments_for( plugin["parameters"], arguments ) except AttributeError as ae: LOG.error("Plugin requested unknown parameters.") raise exceptions.PluginRequestedUnknownParameters( plugin=plugin, exception=ae ) try: return plugin["plugin"](**arguments) except Exception as all_exc: LOG.critical( "Plugin %s raised an unexpected exception", plugin["name"] ) raise exceptions.PluginExecutionFailed( plugin=plugin, exception=all_exc )
@staticmethod def _extract_syntax_information(exception): token = () if len(exception.args) > 1: token = exception.args[1] if len(token) > 2: row, column = token[1:3] else: row, column = (1, 0) if column > 0 and token and isinstance(exception, SyntaxError): # NOTE(sigmavirus24): SyntaxErrors report 1-indexed column # numbers. We need to decrement the column number by 1 at # least. column_offset = 1 row_offset = 0 # See also: https://gitlab.com/pycqa/flake8/issues/237 physical_line = token[-1] # NOTE(sigmavirus24): Not all "tokens" have a string as the last # argument. In this event, let's skip trying to find the correct # column and row values. if physical_line is not None: # NOTE(sigmavirus24): SyntaxErrors also don't exactly have a # "physical" line so much as what was accumulated by the point # tokenizing failed. # See also: https://gitlab.com/pycqa/flake8/issues/237 lines = physical_line.rstrip("\n").split("\n") row_offset = len(lines) - 1 logical_line = lines[0] logical_line_length = len(logical_line) if column > logical_line_length: column = logical_line_length row -= row_offset column -= column_offset return row, column
[docs] def run_ast_checks(self): """Run all checks expecting an abstract syntax tree.""" try: ast = self.processor.build_ast() except (ValueError, SyntaxError, TypeError): (exc_type, exception) = sys.exc_info()[:2] row, column = self._extract_syntax_information(exception) self.report( "E999", row, column, "%s: %s" % (exc_type.__name__, exception.args[0]), ) return for plugin in self.checks["ast_plugins"]: checker = self.run_check(plugin, tree=ast) # If the plugin uses a class, call the run method of it, otherwise # the call should return something iterable itself try: runner = checker.run() except AttributeError: runner = checker for (line_number, offset, text, check) in runner: self.report( error_code=None, line_number=line_number, column=offset, text=text, )
[docs] def run_logical_checks(self): """Run all checks expecting a logical line.""" comments, logical_line, mapping = self.processor.build_logical_line() if not mapping: return self.processor.update_state(mapping) LOG.debug('Logical line: "%s"', logical_line.rstrip()) for plugin in self.checks["logical_line_plugins"]: self.processor.update_checker_state_for(plugin) results = self.run_check(plugin, logical_line=logical_line) or () for offset, text in results: offset = find_offset(offset, mapping) line_number, column_offset = offset self.report( error_code=None, line_number=line_number, column=column_offset, text=text, ) self.processor.next_logical_line()
[docs] def run_physical_checks(self, physical_line, override_error_line=None): """Run all checks for a given physical line. A single physical check may return multiple errors. """ for plugin in self.checks["physical_line_plugins"]: self.processor.update_checker_state_for(plugin) result = self.run_check(plugin, physical_line=physical_line) if result is not None: # This is a single result if first element is an int column_offset = None try: column_offset = result[0] except (IndexError, TypeError): pass if isinstance(column_offset, int): # If we only have a single result, convert to a collection result = (result,) for result_single in result: column_offset, text = result_single error_code = self.report( error_code=None, line_number=self.processor.line_number, column=column_offset, text=text, line=(override_error_line or physical_line), ) self.processor.check_physical_error( error_code, physical_line )
[docs] def process_tokens(self): """Process tokens and trigger checks. This can raise a :class:`flake8.exceptions.InvalidSyntax` exception. Instead of using this directly, you should use :meth:`flake8.checker.FileChecker.run_checks`. """ parens = 0 statistics = self.statistics file_processor = self.processor for token in file_processor.generate_tokens(): statistics["tokens"] += 1 self.check_physical_eol(token) token_type, text = token[0:2] processor.log_token(LOG, token) if token_type == tokenize.OP: parens = processor.count_parentheses(parens, text) elif parens == 0: if processor.token_is_newline(token): self.handle_newline(token_type) if file_processor.tokens: # If any tokens are left over, process them self.run_physical_checks(file_processor.lines[-1]) self.run_logical_checks()
[docs] def run_checks(self): """Run checks against the file.""" try: self.process_tokens() except exceptions.InvalidSyntax as exc: self.report( exc.error_code, exc.line_number, exc.column_number, exc.error_message, ) self.run_ast_checks() logical_lines = self.processor.statistics["logical lines"] self.statistics["logical lines"] = logical_lines return self.filename, self.results, self.statistics
[docs] def handle_newline(self, token_type): """Handle the logic when encountering a newline token.""" if token_type == tokenize.NEWLINE: self.run_logical_checks() self.processor.reset_blank_before() elif len(self.processor.tokens) == 1: # The physical line contains only this token. self.processor.visited_new_blank_line() self.processor.delete_first_token() else: self.run_logical_checks()
[docs] def check_physical_eol(self, token): """Run physical checks if and only if it is at the end of the line.""" if processor.is_eol_token(token): # Obviously, a newline token ends a single physical line. self.run_physical_checks(token[4]) elif processor.is_multiline_string(token): # Less obviously, a string that contains newlines is a # multiline string, either triple-quoted or with internal # newlines backslash-escaped. Check every physical line in the # string *except* for the last one: its newline is outside of # the multiline string, so we consider it a regular physical # line, and will check it like any other physical line. # # Subtleties: # - have to wind self.line_number back because initially it # points to the last line of the string, and we want # check_physical() to give accurate feedback line_no = token[2][0] with self.processor.inside_multiline(line_number=line_no): for line in self.processor.split_line(token): self.run_physical_checks( line + "\n", override_error_line=token[4] )
def _pool_init(): """Ensure correct signaling of ^C using multiprocessing.Pool.""" signal.signal(signal.SIGINT, signal.SIG_IGN) def calculate_pool_chunksize(num_checkers, num_jobs): """Determine the chunksize for the multiprocessing Pool. - For chunksize, see: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap # noqa - This formula, while not perfect, aims to give each worker two batches of work. - See: https://gitlab.com/pycqa/flake8/merge_requests/156#note_18878876 - See: https://gitlab.com/pycqa/flake8/issues/265 """ return max(num_checkers // (num_jobs * 2), 1) def _run_checks(checker): return checker.run_checks() def find_offset(offset, mapping): """Find the offset tuple for a single offset.""" if isinstance(offset, tuple): return offset for token_offset, position in mapping: if offset <= token_offset: break return (position[0], position[1] + offset - token_offset)