Source code for flake8.plugins.manager

"""Plugin loading and management logic and classes."""
import logging
import sys

import entrypoints

from flake8 import exceptions
from flake8 import utils

if sys.version_info >= (3, 3):
    import as collections_abc
    import collections as collections_abc

LOG = logging.getLogger(__name__)

__all__ = ("Checkers", "Plugin", "PluginManager", "ReportFormatters")

NO_GROUP_FOUND = object()

[docs]class Plugin(object): """Wrap an EntryPoint from setuptools and other logic."""
[docs] def __init__(self, name, entry_point, local=False): """Initialize our Plugin. :param str name: Name of the entry-point as it was registered with setuptools. :param entry_point: EntryPoint returned by setuptools. :type entry_point: setuptools.EntryPoint :param bool local: Is this a repo-local plugin? """ = name self.entry_point = entry_point self.local = local self._plugin = None self._parameters = None self._parameter_names = None self._group = None self._plugin_name = None self._version = None
def __repr__(self): """Provide an easy to read description of the current plugin.""" return 'Plugin(name="{0}", entry_point="{1}")'.format(, self.entry_point )
[docs] def to_dictionary(self): """Convert this plugin to a dictionary.""" return { "name":, "parameters": self.parameters, "parameter_names": self.parameter_names, "plugin": self.plugin, "plugin_name": self.plugin_name, }
[docs] def is_in_a_group(self): """Determine if this plugin is in a group. :returns: True if the plugin is in a group, otherwise False. :rtype: bool """ return is not None
[docs] def group(self): """Find and parse the group the plugin is in.""" if self._group is None: name =".", 1) if len(name) > 1: self._group = name[0] else: self._group = NO_GROUP_FOUND if self._group is NO_GROUP_FOUND: return None return self._group
@property def parameters(self): """List of arguments that need to be passed to the plugin.""" if self._parameters is None: self._parameters = utils.parameters_for(self) return self._parameters @property def parameter_names(self): """List of argument names that need to be passed to the plugin.""" if self._parameter_names is None: self._parameter_names = list(self.parameters) return self._parameter_names @property def plugin(self): """Load and return the plugin associated with the entry-point. This property implicitly loads the plugin and then caches it. """ self.load_plugin() return self._plugin @property def version(self): """Return the version of the plugin.""" if self._version is None: if self.is_in_a_group(): self._version = version_for(self) else: self._version = self.plugin.version return self._version @property def plugin_name(self): """Return the name of the plugin.""" if self._plugin_name is None: if self.is_in_a_group(): self._plugin_name = else: self._plugin_name = return self._plugin_name @property def off_by_default(self): """Return whether the plugin is ignored by default.""" return getattr(self.plugin, "off_by_default", False)
[docs] def execute(self, *args, **kwargs): r"""Call the plugin with \*args and \*\*kwargs.""" return self.plugin(*args, **kwargs) # pylint: disable=not-callable
def _load(self): self._plugin = self.entry_point.load() if not callable(self._plugin): msg = ( "Plugin %r is not a callable. It might be written for an" " older version of flake8 and might not work with this" " version" % self._plugin ) LOG.critical(msg) raise TypeError(msg)
[docs] def load_plugin(self): """Retrieve the plugin for this entry-point. This loads the plugin, stores it on the instance and then returns it. It does not reload it after the first time, it merely returns the cached plugin. :returns: Nothing """ if self._plugin is None:'Loading plugin "%s" from entry-point.', try: self._load() except Exception as load_exception: LOG.exception(load_exception) failed_to_load = exceptions.FailedToLoadPlugin( plugin=self, exception=load_exception ) LOG.critical(str(failed_to_load)) raise failed_to_load
[docs] def enable(self, optmanager, options=None): """Remove plugin name from the default ignore list.""" optmanager.remove_from_default_ignore([]) optmanager.extend_default_select([]) if not options: return try: options.ignore.remove( except (ValueError, KeyError): LOG.debug( "Attempted to remove %s from the ignore list but it was " "not a member of the list.",, )
[docs] def disable(self, optmanager): """Add the plugin name to the default ignore list.""" optmanager.extend_default_ignore([])
[docs] def provide_options(self, optmanager, options, extra_args): """Pass the parsed options and extra arguments to the plugin.""" parse_options = getattr(self.plugin, "parse_options", None) if parse_options is not None: LOG.debug('Providing options to plugin "%s".', try: parse_options(optmanager, options, extra_args) except TypeError: parse_options(options) if in options.enable_extensions: self.enable(optmanager, options)
[docs] def register_options(self, optmanager): """Register the plugin's command-line options on the OptionManager. :param optmanager: Instantiated OptionManager to register options on. :type optmanager: flake8.options.manager.OptionManager :returns: Nothing """ add_options = getattr(self.plugin, "add_options", None) if add_options is not None: LOG.debug( 'Registering options from plugin "%s" on OptionManager %r',, optmanager, ) add_options(optmanager) if self.off_by_default: self.disable(optmanager)
[docs]class PluginManager(object): # pylint: disable=too-few-public-methods """Find and manage plugins consistently."""
[docs] def __init__(self, namespace, local_plugins=None): """Initialize the manager. :param str namespace: Namespace of the plugins to manage, e.g., 'flake8.extension'. :param list local_plugins: Plugins from config (as "X =" strings). """ self.namespace = namespace self.plugins = {} self.names = [] self._load_local_plugins(local_plugins or []) self._load_entrypoint_plugins()
def _load_local_plugins(self, local_plugins): """Load local plugins from config. :param list local_plugins: Plugins from config (as "X =" strings). """ for plugin_str in local_plugins: name, _, entry_str = plugin_str.partition("=") name, entry_str = name.strip(), entry_str.strip() entry_point = entrypoints.EntryPoint.from_string(entry_str, name) self._load_plugin_from_entrypoint(entry_point, local=True) def _load_entrypoint_plugins(self):'Loading entry-points for "%s".', self.namespace) for entry_point in entrypoints.get_group_all(self.namespace): if == "per-file-ignores": LOG.warning( "flake8-per-file-ignores plugin is incompatible with " "flake8>=3.7 (which implements per-file-ignores itself)." ) continue self._load_plugin_from_entrypoint(entry_point) def _load_plugin_from_entrypoint(self, entry_point, local=False): """Load a plugin from a setuptools EntryPoint. :param EntryPoint entry_point: EntryPoint to load plugin from. :param bool local: Is this a repo-local plugin? """ name = self.plugins[name] = Plugin(name, entry_point, local=local) self.names.append(name) LOG.debug('Loaded %r for plugin "%s".', self.plugins[name], name)
[docs] def map(self, func, *args, **kwargs): r"""Call ``func`` with the plugin and \*args and \**kwargs after. This yields the return value from ``func`` for each plugin. :param collections.Callable func: Function to call with each plugin. Signature should at least be: .. code-block:: python def myfunc(plugin): pass Any extra positional or keyword arguments specified with map will be passed along to this function after the plugin. The plugin passed is a :class:`~flake8.plugins.manager.Plugin`. :param args: Positional arguments to pass to ``func`` after each plugin. :param kwargs: Keyword arguments to pass to ``func`` after each plugin. """ for name in self.names: yield func(self.plugins[name], *args, **kwargs)
[docs] def versions(self): # () -> (str, str) """Generate the versions of plugins. :returns: Tuples of the plugin_name and version :rtype: tuple """ plugins_seen = set() for entry_point_name in self.names: plugin = self.plugins[entry_point_name] plugin_name = plugin.plugin_name if plugin.plugin_name in plugins_seen: continue plugins_seen.add(plugin_name) yield (plugin_name, plugin.version)
def version_for(plugin): # (Plugin) -> Optional[str] """Determine the version of a plugin by its module. :param plugin: The loaded plugin :type plugin: Plugin :returns: version string for the module :rtype: str """ module_name = plugin.plugin.__module__ try: module = __import__(module_name) except ImportError: return None return getattr(module, "__version__", None)
[docs]class PluginTypeManager(object): """Parent class for most of the specific plugin types.""" namespace = None def __init__(self, local_plugins=None): """Initialize the plugin type's manager. :param list local_plugins: Plugins from config file instead of entry-points """ self.manager = PluginManager( self.namespace, local_plugins=local_plugins ) self.plugins_loaded = False def __contains__(self, name): """Check if the entry-point name is in this plugin type manager.""" LOG.debug('Checking for "%s" in plugin type manager.', name) return name in self.plugins def __getitem__(self, name): """Retrieve a plugin by its name.""" LOG.debug('Retrieving plugin for "%s".', name) return self.plugins[name]
[docs] def get(self, name, default=None): """Retrieve the plugin referred to by ``name`` or return the default. :param str name: Name of the plugin to retrieve. :param default: Default value to return. :returns: Plugin object referred to by name, if it exists. :rtype: :class:`Plugin` """ if name in self: return self[name] return default
@property def names(self): """Proxy attribute to underlying manager.""" return self.manager.names @property def plugins(self): """Proxy attribute to underlying manager.""" return self.manager.plugins @staticmethod def _generate_call_function(method_name, optmanager, *args, **kwargs): def generated_function(plugin): # noqa: D105 method = getattr(plugin, method_name, None) if method is not None and isinstance( method, collections_abc.Callable ): return method(optmanager, *args, **kwargs) return generated_function
[docs] def load_plugins(self): """Load all plugins of this type that are managed by this manager.""" if self.plugins_loaded: return def load_plugin(plugin): """Call each plugin's load_plugin method.""" return plugin.load_plugin() plugins = list( # Do not set plugins_loaded if we run into an exception self.plugins_loaded = True return plugins
[docs] def register_plugin_versions(self, optmanager): """Register the plugins and their versions with the OptionManager.""" self.load_plugins() for (plugin_name, version) in self.manager.versions(): optmanager.register_plugin(name=plugin_name, version=version)
[docs] def register_options(self, optmanager): """Register all of the checkers' options to the OptionManager.""" self.load_plugins() call_register_options = self._generate_call_function( "register_options", optmanager ) list(
[docs] def provide_options(self, optmanager, options, extra_args): """Provide parsed options and extra arguments to the plugins.""" call_provide_options = self._generate_call_function( "provide_options", optmanager, options, extra_args ) list(
[docs]class Checkers(PluginTypeManager): """All of the checkers registered through entry-points or config.""" namespace = "flake8.extension"
[docs] def checks_expecting(self, argument_name): """Retrieve checks that expect an argument with the specified name. Find all checker plugins that are expecting a specific argument. """ for plugin in self.plugins.values(): if argument_name == plugin.parameter_names[0]: yield plugin
[docs] def to_dictionary(self): """Return a dictionary of AST and line-based plugins.""" return { "ast_plugins": [ plugin.to_dictionary() for plugin in self.ast_plugins ], "logical_line_plugins": [ plugin.to_dictionary() for plugin in self.logical_line_plugins ], "physical_line_plugins": [ plugin.to_dictionary() for plugin in self.physical_line_plugins ], }
[docs] def register_options(self, optmanager): """Register all of the checkers' options to the OptionManager. This also ensures that plugins that are not part of a group and are enabled by default are enabled on the option manager. """ # NOTE(sigmavirus24) We reproduce a little of # PluginTypeManager.register_options to reduce the number of times # that we loop over the list of plugins. Instead of looping twice, # option registration and enabling the plugin, we loop once with one # function to map over the plugins. self.load_plugins() call_register_options = self._generate_call_function( "register_options", optmanager ) def register_and_enable(plugin): call_register_options(plugin) if is None and not plugin.off_by_default: plugin.enable(optmanager) list(
@property def ast_plugins(self): """List of plugins that expect the AST tree.""" plugins = getattr(self, "_ast_plugins", []) if not plugins: plugins = list(self.checks_expecting("tree")) self._ast_plugins = plugins return plugins @property def logical_line_plugins(self): """List of plugins that expect the logical lines.""" plugins = getattr(self, "_logical_line_plugins", []) if not plugins: plugins = list(self.checks_expecting("logical_line")) self._logical_line_plugins = plugins return plugins @property def physical_line_plugins(self): """List of plugins that expect the physical lines.""" plugins = getattr(self, "_physical_line_plugins", []) if not plugins: plugins = list(self.checks_expecting("physical_line")) self._physical_line_plugins = plugins return plugins
[docs]class ReportFormatters(PluginTypeManager): """All of the report formatters registered through entry-points/config.""" namespace = ""