123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- import functools
- import logging
- import os
- from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
- from pip._vendor.packaging.utils import canonicalize_name
- from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
- from pip._vendor.resolvelib import Resolver as RLResolver
- from pip._vendor.resolvelib.structs import DirectedGraph
- from pip._internal.cache import WheelCache
- from pip._internal.index.package_finder import PackageFinder
- from pip._internal.operations.prepare import RequirementPreparer
- from pip._internal.req.req_install import InstallRequirement
- from pip._internal.req.req_set import RequirementSet
- from pip._internal.resolution.base import BaseResolver, InstallRequirementProvider
- from pip._internal.resolution.resolvelib.provider import PipProvider
- from pip._internal.resolution.resolvelib.reporter import (
- PipDebuggingReporter,
- PipReporter,
- )
- from .base import Candidate, Requirement
- from .factory import Factory
- if TYPE_CHECKING:
- from pip._vendor.resolvelib.resolvers import Result as RLResult
- Result = RLResult[Requirement, Candidate, str]
- logger = logging.getLogger(__name__)
- class Resolver(BaseResolver):
- _allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"}
- def __init__(
- self,
- preparer: RequirementPreparer,
- finder: PackageFinder,
- wheel_cache: Optional[WheelCache],
- make_install_req: InstallRequirementProvider,
- use_user_site: bool,
- ignore_dependencies: bool,
- ignore_installed: bool,
- ignore_requires_python: bool,
- force_reinstall: bool,
- upgrade_strategy: str,
- py_version_info: Optional[Tuple[int, ...]] = None,
- ):
- super().__init__()
- assert upgrade_strategy in self._allowed_strategies
- self.factory = Factory(
- finder=finder,
- preparer=preparer,
- make_install_req=make_install_req,
- wheel_cache=wheel_cache,
- use_user_site=use_user_site,
- force_reinstall=force_reinstall,
- ignore_installed=ignore_installed,
- ignore_requires_python=ignore_requires_python,
- py_version_info=py_version_info,
- )
- self.ignore_dependencies = ignore_dependencies
- self.upgrade_strategy = upgrade_strategy
- self._result: Optional[Result] = None
- def resolve(
- self, root_reqs: List[InstallRequirement], check_supported_wheels: bool
- ) -> RequirementSet:
- collected = self.factory.collect_root_requirements(root_reqs)
- provider = PipProvider(
- factory=self.factory,
- constraints=collected.constraints,
- ignore_dependencies=self.ignore_dependencies,
- upgrade_strategy=self.upgrade_strategy,
- user_requested=collected.user_requested,
- )
- if "PIP_RESOLVER_DEBUG" in os.environ:
- reporter: BaseReporter = PipDebuggingReporter()
- else:
- reporter = PipReporter()
- resolver: RLResolver[Requirement, Candidate, str] = RLResolver(
- provider,
- reporter,
- )
- try:
- limit_how_complex_resolution_can_be = 200000
- result = self._result = resolver.resolve(
- collected.requirements, max_rounds=limit_how_complex_resolution_can_be
- )
- except ResolutionImpossible as e:
- error = self.factory.get_installation_error(
- cast("ResolutionImpossible[Requirement, Candidate]", e),
- collected.constraints,
- )
- raise error from e
- req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
- for candidate in result.mapping.values():
- ireq = candidate.get_install_requirement()
- if ireq is None:
- continue
- # Check if there is already an installation under the same name,
- # and set a flag for later stages to uninstall it, if needed.
- installed_dist = self.factory.get_dist_to_uninstall(candidate)
- if installed_dist is None:
- # There is no existing installation -- nothing to uninstall.
- ireq.should_reinstall = False
- elif self.factory.force_reinstall:
- # The --force-reinstall flag is set -- reinstall.
- ireq.should_reinstall = True
- elif installed_dist.version != candidate.version:
- # The installation is different in version -- reinstall.
- ireq.should_reinstall = True
- elif candidate.is_editable or installed_dist.editable:
- # The incoming distribution is editable, or different in
- # editable-ness to installation -- reinstall.
- ireq.should_reinstall = True
- elif candidate.source_link and candidate.source_link.is_file:
- # The incoming distribution is under file://
- if candidate.source_link.is_wheel:
- # is a local wheel -- do nothing.
- logger.info(
- "%s is already installed with the same version as the "
- "provided wheel. Use --force-reinstall to force an "
- "installation of the wheel.",
- ireq.name,
- )
- continue
- # is a local sdist or path -- reinstall
- ireq.should_reinstall = True
- else:
- continue
- link = candidate.source_link
- if link and link.is_yanked:
- # The reason can contain non-ASCII characters, Unicode
- # is required for Python 2.
- msg = (
- "The candidate selected for download or install is a "
- "yanked version: {name!r} candidate (version {version} "
- "at {link})\nReason for being yanked: {reason}"
- ).format(
- name=candidate.name,
- version=candidate.version,
- link=link,
- reason=link.yanked_reason or "<none given>",
- )
- logger.warning(msg)
- req_set.add_named_requirement(ireq)
- reqs = req_set.all_requirements
- self.factory.preparer.prepare_linked_requirements_more(reqs)
- return req_set
- def get_installation_order(
- self, req_set: RequirementSet
- ) -> List[InstallRequirement]:
- """Get order for installation of requirements in RequirementSet.
- The returned list contains a requirement before another that depends on
- it. This helps ensure that the environment is kept consistent as they
- get installed one-by-one.
- The current implementation creates a topological ordering of the
- dependency graph, giving more weight to packages with less
- or no dependencies, while breaking any cycles in the graph at
- arbitrary points. We make no guarantees about where the cycle
- would be broken, other than it *would* be broken.
- """
- assert self._result is not None, "must call resolve() first"
- if not req_set.requirements:
- # Nothing is left to install, so we do not need an order.
- return []
- graph = self._result.graph
- weights = get_topological_weights(graph, set(req_set.requirements.keys()))
- sorted_items = sorted(
- req_set.requirements.items(),
- key=functools.partial(_req_set_item_sorter, weights=weights),
- reverse=True,
- )
- return [ireq for _, ireq in sorted_items]
- def get_topological_weights(
- graph: "DirectedGraph[Optional[str]]", requirement_keys: Set[str]
- ) -> Dict[Optional[str], int]:
- """Assign weights to each node based on how "deep" they are.
- This implementation may change at any point in the future without prior
- notice.
- We first simplify the dependency graph by pruning any leaves and giving them
- the highest weight: a package without any dependencies should be installed
- first. This is done again and again in the same way, giving ever less weight
- to the newly found leaves. The loop stops when no leaves are left: all
- remaining packages have at least one dependency left in the graph.
- Then we continue with the remaining graph, by taking the length for the
- longest path to any node from root, ignoring any paths that contain a single
- node twice (i.e. cycles). This is done through a depth-first search through
- the graph, while keeping track of the path to the node.
- Cycles in the graph result would result in node being revisited while also
- being on its own path. In this case, take no action. This helps ensure we
- don't get stuck in a cycle.
- When assigning weight, the longer path (i.e. larger length) is preferred.
- We are only interested in the weights of packages that are in the
- requirement_keys.
- """
- path: Set[Optional[str]] = set()
- weights: Dict[Optional[str], int] = {}
- def visit(node: Optional[str]) -> None:
- if node in path:
- # We hit a cycle, so we'll break it here.
- return
- # Time to visit the children!
- path.add(node)
- for child in graph.iter_children(node):
- visit(child)
- path.remove(node)
- if node not in requirement_keys:
- return
- last_known_parent_count = weights.get(node, 0)
- weights[node] = max(last_known_parent_count, len(path))
- # Simplify the graph, pruning leaves that have no dependencies.
- # This is needed for large graphs (say over 200 packages) because the
- # `visit` function is exponentially slower then, taking minutes.
- # See https://github.com/pypa/pip/issues/10557
- # We will loop until we explicitly break the loop.
- while True:
- leaves = set()
- for key in graph:
- if key is None:
- continue
- for _child in graph.iter_children(key):
- # This means we have at least one child
- break
- else:
- # No child.
- leaves.add(key)
- if not leaves:
- # We are done simplifying.
- break
- # Calculate the weight for the leaves.
- weight = len(graph) - 1
- for leaf in leaves:
- if leaf not in requirement_keys:
- continue
- weights[leaf] = weight
- # Remove the leaves from the graph, making it simpler.
- for leaf in leaves:
- graph.remove(leaf)
- # Visit the remaining graph.
- # `None` is guaranteed to be the root node by resolvelib.
- visit(None)
- # Sanity check: all requirement keys should be in the weights,
- # and no other keys should be in the weights.
- difference = set(weights.keys()).difference(requirement_keys)
- assert not difference, difference
- return weights
- def _req_set_item_sorter(
- item: Tuple[str, InstallRequirement],
- weights: Dict[Optional[str], int],
- ) -> Tuple[int, str]:
- """Key function used to sort install requirements for installation.
- Based on the "weight" mapping calculated in ``get_installation_order()``.
- The canonical package name is returned as the second member as a tie-
- breaker to ensure the result is predictable, which is useful in tests.
- """
- name = canonicalize_name(item[0])
- return weights[name], name
|