import json import logging from optparse import Values from typing import TYPE_CHECKING, Generator, List, Optional, Sequence, Tuple, cast from pip._vendor.packaging.utils import canonicalize_name from pip._internal.cli import cmdoptions from pip._internal.cli.req_command import IndexGroupCommand from pip._internal.cli.status_codes import SUCCESS from pip._internal.exceptions import CommandError from pip._internal.index.collector import LinkCollector from pip._internal.index.package_finder import PackageFinder from pip._internal.metadata import BaseDistribution, get_environment from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.network.session import PipSession from pip._internal.utils.compat import stdlib_pkgs from pip._internal.utils.misc import tabulate, write_output if TYPE_CHECKING: from pip._internal.metadata.base import DistributionVersion class _DistWithLatestInfo(BaseDistribution): """Give the distribution object a couple of extra fields. These will be populated during ``get_outdated()``. This is dirty but makes the rest of the code much cleaner. """ latest_version: DistributionVersion latest_filetype: str _ProcessedDists = Sequence[_DistWithLatestInfo] logger = logging.getLogger(__name__) class ListCommand(IndexGroupCommand): """ List installed packages, including editables. Packages are listed in a case-insensitive sorted order. """ ignore_require_venv = True usage = """ %prog [options]""" def add_options(self) -> None: self.cmd_opts.add_option( "-o", "--outdated", action="store_true", default=False, help="List outdated packages", ) self.cmd_opts.add_option( "-u", "--uptodate", action="store_true", default=False, help="List uptodate packages", ) self.cmd_opts.add_option( "-e", "--editable", action="store_true", default=False, help="List editable projects.", ) self.cmd_opts.add_option( "-l", "--local", action="store_true", default=False, help=( "If in a virtualenv that has global access, do not list " "globally-installed packages." ), ) self.cmd_opts.add_option( "--user", dest="user", action="store_true", default=False, help="Only output packages installed in user-site.", ) self.cmd_opts.add_option(cmdoptions.list_path()) self.cmd_opts.add_option( "--pre", action="store_true", default=False, help=( "Include pre-release and development versions. By default, " "pip only finds stable versions." ), ) self.cmd_opts.add_option( "--format", action="store", dest="list_format", default="columns", choices=("columns", "freeze", "json"), help="Select the output format among: columns (default), freeze, or json", ) self.cmd_opts.add_option( "--not-required", action="store_true", dest="not_required", help="List packages that are not dependencies of installed packages.", ) self.cmd_opts.add_option( "--exclude-editable", action="store_false", dest="include_editable", help="Exclude editable package from output.", ) self.cmd_opts.add_option( "--include-editable", action="store_true", dest="include_editable", help="Include editable package from output.", default=True, ) self.cmd_opts.add_option(cmdoptions.list_exclude()) index_opts = cmdoptions.make_option_group(cmdoptions.index_group, self.parser) self.parser.insert_option_group(0, index_opts) self.parser.insert_option_group(0, self.cmd_opts) def _build_package_finder( self, options: Values, session: PipSession ) -> PackageFinder: """ Create a package finder appropriate to this list command. """ link_collector = LinkCollector.create(session, options=options) # Pass allow_yanked=False to ignore yanked versions. selection_prefs = SelectionPreferences( allow_yanked=False, allow_all_prereleases=options.pre, ) return PackageFinder.create( link_collector=link_collector, selection_prefs=selection_prefs, ) def run(self, options: Values, args: List[str]) -> int: if options.outdated and options.uptodate: raise CommandError("Options --outdated and --uptodate cannot be combined.") if options.outdated and options.list_format == "freeze": raise CommandError( "List format 'freeze' can not be used with the --outdated option." ) cmdoptions.check_list_path_option(options) skip = set(stdlib_pkgs) if options.excludes: skip.update(canonicalize_name(n) for n in options.excludes) packages: "_ProcessedDists" = [ cast("_DistWithLatestInfo", d) for d in get_environment(options.path).iter_installed_distributions( local_only=options.local, user_only=options.user, editables_only=options.editable, include_editables=options.include_editable, skip=skip, ) ] # get_not_required must be called firstly in order to find and # filter out all dependencies correctly. Otherwise a package # can't be identified as requirement because some parent packages # could be filtered out before. if options.not_required: packages = self.get_not_required(packages, options) if options.outdated: packages = self.get_outdated(packages, options) elif options.uptodate: packages = self.get_uptodate(packages, options) self.output_package_listing(packages, options) return SUCCESS def get_outdated( self, packages: "_ProcessedDists", options: Values ) -> "_ProcessedDists": return [ dist for dist in self.iter_packages_latest_infos(packages, options) if dist.latest_version > dist.version ] def get_uptodate( self, packages: "_ProcessedDists", options: Values ) -> "_ProcessedDists": return [ dist for dist in self.iter_packages_latest_infos(packages, options) if dist.latest_version == dist.version ] def get_not_required( self, packages: "_ProcessedDists", options: Values ) -> "_ProcessedDists": dep_keys = { canonicalize_name(dep.name) for dist in packages for dep in (dist.iter_dependencies() or ()) } # Create a set to remove duplicate packages, and cast it to a list # to keep the return type consistent with get_outdated and # get_uptodate return list({pkg for pkg in packages if pkg.canonical_name not in dep_keys}) def iter_packages_latest_infos( self, packages: "_ProcessedDists", options: Values ) -> Generator["_DistWithLatestInfo", None, None]: with self._build_session(options) as session: finder = self._build_package_finder(options, session) def latest_info( dist: "_DistWithLatestInfo", ) -> Optional["_DistWithLatestInfo"]: all_candidates = finder.find_all_candidates(dist.canonical_name) if not options.pre: # Remove prereleases all_candidates = [ candidate for candidate in all_candidates if not candidate.version.is_prerelease ] evaluator = finder.make_candidate_evaluator( project_name=dist.canonical_name, ) best_candidate = evaluator.sort_best_candidate(all_candidates) if best_candidate is None: return None remote_version = best_candidate.version if best_candidate.link.is_wheel: typ = "wheel" else: typ = "sdist" dist.latest_version = remote_version dist.latest_filetype = typ return dist for dist in map(latest_info, packages): if dist is not None: yield dist def output_package_listing( self, packages: "_ProcessedDists", options: Values ) -> None: packages = sorted( packages, key=lambda dist: dist.canonical_name, ) if options.list_format == "columns" and packages: data, header = format_for_columns(packages, options) self.output_package_listing_columns(data, header) elif options.list_format == "freeze": for dist in packages: if options.verbose >= 1: write_output( "%s==%s (%s)", dist.raw_name, dist.version, dist.location ) else: write_output("%s==%s", dist.raw_name, dist.version) elif options.list_format == "json": write_output(format_for_json(packages, options)) def output_package_listing_columns( self, data: List[List[str]], header: List[str] ) -> None: # insert the header first: we need to know the size of column names if len(data) > 0: data.insert(0, header) pkg_strings, sizes = tabulate(data) # Create and add a separator. if len(data) > 0: pkg_strings.insert(1, " ".join(map(lambda x: "-" * x, sizes))) for val in pkg_strings: write_output(val) def format_for_columns( pkgs: "_ProcessedDists", options: Values ) -> Tuple[List[List[str]], List[str]]: """ Convert the package data into something usable by output_package_listing_columns. """ header = ["Package", "Version"] running_outdated = options.outdated if running_outdated: header.extend(["Latest", "Type"]) has_editables = any(x.editable for x in pkgs) if has_editables: header.append("Editable project location") if options.verbose >= 1: header.append("Location") if options.verbose >= 1: header.append("Installer") data = [] for proj in pkgs: # if we're working on the 'outdated' list, separate out the # latest_version and type row = [proj.raw_name, str(proj.version)] if running_outdated: row.append(str(proj.latest_version)) row.append(proj.latest_filetype) if has_editables: row.append(proj.editable_project_location or "") if options.verbose >= 1: row.append(proj.location or "") if options.verbose >= 1: row.append(proj.installer) data.append(row) return data, header def format_for_json(packages: "_ProcessedDists", options: Values) -> str: data = [] for dist in packages: info = { "name": dist.raw_name, "version": str(dist.version), } if options.verbose >= 1: info["location"] = dist.location or "" info["installer"] = dist.installer if options.outdated: info["latest_version"] = str(dist.latest_version) info["latest_filetype"] = dist.latest_filetype editable_project_location = dist.editable_project_location if editable_project_location: info["editable_project_location"] = editable_project_location data.append(info) return json.dumps(data)