123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730 |
- import contextlib
- import functools
- import logging
- from typing import (
- TYPE_CHECKING,
- Dict,
- FrozenSet,
- Iterable,
- Iterator,
- List,
- Mapping,
- NamedTuple,
- Optional,
- Sequence,
- Set,
- Tuple,
- TypeVar,
- cast,
- )
- from pip._vendor.packaging.requirements import InvalidRequirement
- from pip._vendor.packaging.specifiers import SpecifierSet
- from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
- from pip._vendor.resolvelib import ResolutionImpossible
- from pip._internal.cache import CacheEntry, WheelCache
- from pip._internal.exceptions import (
- DistributionNotFound,
- InstallationError,
- MetadataInconsistent,
- UnsupportedPythonVersion,
- UnsupportedWheel,
- )
- from pip._internal.index.package_finder import PackageFinder
- from pip._internal.metadata import BaseDistribution, get_default_environment
- from pip._internal.models.link import Link
- from pip._internal.models.wheel import Wheel
- from pip._internal.operations.prepare import RequirementPreparer
- from pip._internal.req.constructors import install_req_from_link_and_ireq
- from pip._internal.req.req_install import (
- InstallRequirement,
- check_invalid_constraint_type,
- )
- from pip._internal.resolution.base import InstallRequirementProvider
- from pip._internal.utils.compatibility_tags import get_supported
- from pip._internal.utils.hashes import Hashes
- from pip._internal.utils.packaging import get_requirement
- from pip._internal.utils.virtualenv import running_under_virtualenv
- from .base import Candidate, CandidateVersion, Constraint, Requirement
- from .candidates import (
- AlreadyInstalledCandidate,
- BaseCandidate,
- EditableCandidate,
- ExtrasCandidate,
- LinkCandidate,
- RequiresPythonCandidate,
- as_base_candidate,
- )
- from .found_candidates import FoundCandidates, IndexCandidateInfo
- from .requirements import (
- ExplicitRequirement,
- RequiresPythonRequirement,
- SpecifierRequirement,
- UnsatisfiableRequirement,
- )
- if TYPE_CHECKING:
- from typing import Protocol
- class ConflictCause(Protocol):
- requirement: RequiresPythonRequirement
- parent: Candidate
- logger = logging.getLogger(__name__)
- C = TypeVar("C")
- Cache = Dict[Link, C]
- class CollectedRootRequirements(NamedTuple):
- requirements: List[Requirement]
- constraints: Dict[str, Constraint]
- user_requested: Dict[str, int]
- class Factory:
- def __init__(
- self,
- finder: PackageFinder,
- preparer: RequirementPreparer,
- make_install_req: InstallRequirementProvider,
- wheel_cache: Optional[WheelCache],
- use_user_site: bool,
- force_reinstall: bool,
- ignore_installed: bool,
- ignore_requires_python: bool,
- py_version_info: Optional[Tuple[int, ...]] = None,
- ) -> None:
- self._finder = finder
- self.preparer = preparer
- self._wheel_cache = wheel_cache
- self._python_candidate = RequiresPythonCandidate(py_version_info)
- self._make_install_req_from_spec = make_install_req
- self._use_user_site = use_user_site
- self._force_reinstall = force_reinstall
- self._ignore_requires_python = ignore_requires_python
- self._build_failures: Cache[InstallationError] = {}
- self._link_candidate_cache: Cache[LinkCandidate] = {}
- self._editable_candidate_cache: Cache[EditableCandidate] = {}
- self._installed_candidate_cache: Dict[str, AlreadyInstalledCandidate] = {}
- self._extras_candidate_cache: Dict[
- Tuple[int, FrozenSet[str]], ExtrasCandidate
- ] = {}
- if not ignore_installed:
- env = get_default_environment()
- self._installed_dists = {
- dist.canonical_name: dist
- for dist in env.iter_installed_distributions(local_only=False)
- }
- else:
- self._installed_dists = {}
- @property
- def force_reinstall(self) -> bool:
- return self._force_reinstall
- def _fail_if_link_is_unsupported_wheel(self, link: Link) -> None:
- if not link.is_wheel:
- return
- wheel = Wheel(link.filename)
- if wheel.supported(self._finder.target_python.get_tags()):
- return
- msg = f"{link.filename} is not a supported wheel on this platform."
- raise UnsupportedWheel(msg)
- def _make_extras_candidate(
- self, base: BaseCandidate, extras: FrozenSet[str]
- ) -> ExtrasCandidate:
- cache_key = (id(base), extras)
- try:
- candidate = self._extras_candidate_cache[cache_key]
- except KeyError:
- candidate = ExtrasCandidate(base, extras)
- self._extras_candidate_cache[cache_key] = candidate
- return candidate
- def _make_candidate_from_dist(
- self,
- dist: BaseDistribution,
- extras: FrozenSet[str],
- template: InstallRequirement,
- ) -> Candidate:
- try:
- base = self._installed_candidate_cache[dist.canonical_name]
- except KeyError:
- base = AlreadyInstalledCandidate(dist, template, factory=self)
- self._installed_candidate_cache[dist.canonical_name] = base
- if not extras:
- return base
- return self._make_extras_candidate(base, extras)
- def _make_candidate_from_link(
- self,
- link: Link,
- extras: FrozenSet[str],
- template: InstallRequirement,
- name: Optional[NormalizedName],
- version: Optional[CandidateVersion],
- ) -> Optional[Candidate]:
- # TODO: Check already installed candidate, and use it if the link and
- # editable flag match.
- if link in self._build_failures:
- # We already tried this candidate before, and it does not build.
- # Don't bother trying again.
- return None
- if template.editable:
- if link not in self._editable_candidate_cache:
- try:
- self._editable_candidate_cache[link] = EditableCandidate(
- link,
- template,
- factory=self,
- name=name,
- version=version,
- )
- except MetadataInconsistent as e:
- logger.info(
- "Discarding [blue underline]%s[/]: [yellow]%s[reset]",
- link,
- e,
- extra={"markup": True},
- )
- self._build_failures[link] = e
- return None
- base: BaseCandidate = self._editable_candidate_cache[link]
- else:
- if link not in self._link_candidate_cache:
- try:
- self._link_candidate_cache[link] = LinkCandidate(
- link,
- template,
- factory=self,
- name=name,
- version=version,
- )
- except MetadataInconsistent as e:
- logger.info(
- "Discarding [blue underline]%s[/]: [yellow]%s[reset]",
- link,
- e,
- extra={"markup": True},
- )
- self._build_failures[link] = e
- return None
- base = self._link_candidate_cache[link]
- if not extras:
- return base
- return self._make_extras_candidate(base, extras)
- def _iter_found_candidates(
- self,
- ireqs: Sequence[InstallRequirement],
- specifier: SpecifierSet,
- hashes: Hashes,
- prefers_installed: bool,
- incompatible_ids: Set[int],
- ) -> Iterable[Candidate]:
- if not ireqs:
- return ()
- # The InstallRequirement implementation requires us to give it a
- # "template". Here we just choose the first requirement to represent
- # all of them.
- # Hopefully the Project model can correct this mismatch in the future.
- template = ireqs[0]
- assert template.req, "Candidates found on index must be PEP 508"
- name = canonicalize_name(template.req.name)
- extras: FrozenSet[str] = frozenset()
- for ireq in ireqs:
- assert ireq.req, "Candidates found on index must be PEP 508"
- specifier &= ireq.req.specifier
- hashes &= ireq.hashes(trust_internet=False)
- extras |= frozenset(ireq.extras)
- def _get_installed_candidate() -> Optional[Candidate]:
- """Get the candidate for the currently-installed version."""
- # If --force-reinstall is set, we want the version from the index
- # instead, so we "pretend" there is nothing installed.
- if self._force_reinstall:
- return None
- try:
- installed_dist = self._installed_dists[name]
- except KeyError:
- return None
- # Don't use the installed distribution if its version does not fit
- # the current dependency graph.
- if not specifier.contains(installed_dist.version, prereleases=True):
- return None
- candidate = self._make_candidate_from_dist(
- dist=installed_dist,
- extras=extras,
- template=template,
- )
- # The candidate is a known incompatibility. Don't use it.
- if id(candidate) in incompatible_ids:
- return None
- return candidate
- def iter_index_candidate_infos() -> Iterator[IndexCandidateInfo]:
- result = self._finder.find_best_candidate(
- project_name=name,
- specifier=specifier,
- hashes=hashes,
- )
- icans = list(result.iter_applicable())
- # PEP 592: Yanked releases are ignored unless the specifier
- # explicitly pins a version (via '==' or '===') that can be
- # solely satisfied by a yanked release.
- all_yanked = all(ican.link.is_yanked for ican in icans)
- def is_pinned(specifier: SpecifierSet) -> bool:
- for sp in specifier:
- if sp.operator == "===":
- return True
- if sp.operator != "==":
- continue
- if sp.version.endswith(".*"):
- continue
- return True
- return False
- pinned = is_pinned(specifier)
- # PackageFinder returns earlier versions first, so we reverse.
- for ican in reversed(icans):
- if not (all_yanked and pinned) and ican.link.is_yanked:
- continue
- func = functools.partial(
- self._make_candidate_from_link,
- link=ican.link,
- extras=extras,
- template=template,
- name=name,
- version=ican.version,
- )
- yield ican.version, func
- return FoundCandidates(
- iter_index_candidate_infos,
- _get_installed_candidate(),
- prefers_installed,
- incompatible_ids,
- )
- def _iter_explicit_candidates_from_base(
- self,
- base_requirements: Iterable[Requirement],
- extras: FrozenSet[str],
- ) -> Iterator[Candidate]:
- """Produce explicit candidates from the base given an extra-ed package.
- :param base_requirements: Requirements known to the resolver. The
- requirements are guaranteed to not have extras.
- :param extras: The extras to inject into the explicit requirements'
- candidates.
- """
- for req in base_requirements:
- lookup_cand, _ = req.get_candidate_lookup()
- if lookup_cand is None: # Not explicit.
- continue
- # We've stripped extras from the identifier, and should always
- # get a BaseCandidate here, unless there's a bug elsewhere.
- base_cand = as_base_candidate(lookup_cand)
- assert base_cand is not None, "no extras here"
- yield self._make_extras_candidate(base_cand, extras)
- def _iter_candidates_from_constraints(
- self,
- identifier: str,
- constraint: Constraint,
- template: InstallRequirement,
- ) -> Iterator[Candidate]:
- """Produce explicit candidates from constraints.
- This creates "fake" InstallRequirement objects that are basically clones
- of what "should" be the template, but with original_link set to link.
- """
- for link in constraint.links:
- self._fail_if_link_is_unsupported_wheel(link)
- candidate = self._make_candidate_from_link(
- link,
- extras=frozenset(),
- template=install_req_from_link_and_ireq(link, template),
- name=canonicalize_name(identifier),
- version=None,
- )
- if candidate:
- yield candidate
- def find_candidates(
- self,
- identifier: str,
- requirements: Mapping[str, Iterable[Requirement]],
- incompatibilities: Mapping[str, Iterator[Candidate]],
- constraint: Constraint,
- prefers_installed: bool,
- ) -> Iterable[Candidate]:
- # Collect basic lookup information from the requirements.
- explicit_candidates: Set[Candidate] = set()
- ireqs: List[InstallRequirement] = []
- for req in requirements[identifier]:
- cand, ireq = req.get_candidate_lookup()
- if cand is not None:
- explicit_candidates.add(cand)
- if ireq is not None:
- ireqs.append(ireq)
- # If the current identifier contains extras, add explicit candidates
- # from entries from extra-less identifier.
- with contextlib.suppress(InvalidRequirement):
- parsed_requirement = get_requirement(identifier)
- explicit_candidates.update(
- self._iter_explicit_candidates_from_base(
- requirements.get(parsed_requirement.name, ()),
- frozenset(parsed_requirement.extras),
- ),
- )
- # Add explicit candidates from constraints. We only do this if there are
- # known ireqs, which represent requirements not already explicit. If
- # there are no ireqs, we're constraining already-explicit requirements,
- # which is handled later when we return the explicit candidates.
- if ireqs:
- try:
- explicit_candidates.update(
- self._iter_candidates_from_constraints(
- identifier,
- constraint,
- template=ireqs[0],
- ),
- )
- except UnsupportedWheel:
- # If we're constrained to install a wheel incompatible with the
- # target architecture, no candidates will ever be valid.
- return ()
- # Since we cache all the candidates, incompatibility identification
- # can be made quicker by comparing only the id() values.
- incompat_ids = {id(c) for c in incompatibilities.get(identifier, ())}
- # If none of the requirements want an explicit candidate, we can ask
- # the finder for candidates.
- if not explicit_candidates:
- return self._iter_found_candidates(
- ireqs,
- constraint.specifier,
- constraint.hashes,
- prefers_installed,
- incompat_ids,
- )
- return (
- c
- for c in explicit_candidates
- if id(c) not in incompat_ids
- and constraint.is_satisfied_by(c)
- and all(req.is_satisfied_by(c) for req in requirements[identifier])
- )
- def _make_requirement_from_install_req(
- self, ireq: InstallRequirement, requested_extras: Iterable[str]
- ) -> Optional[Requirement]:
- if not ireq.match_markers(requested_extras):
- logger.info(
- "Ignoring %s: markers '%s' don't match your environment",
- ireq.name,
- ireq.markers,
- )
- return None
- if not ireq.link:
- return SpecifierRequirement(ireq)
- self._fail_if_link_is_unsupported_wheel(ireq.link)
- cand = self._make_candidate_from_link(
- ireq.link,
- extras=frozenset(ireq.extras),
- template=ireq,
- name=canonicalize_name(ireq.name) if ireq.name else None,
- version=None,
- )
- if cand is None:
- # There's no way we can satisfy a URL requirement if the underlying
- # candidate fails to build. An unnamed URL must be user-supplied, so
- # we fail eagerly. If the URL is named, an unsatisfiable requirement
- # can make the resolver do the right thing, either backtrack (and
- # maybe find some other requirement that's buildable) or raise a
- # ResolutionImpossible eventually.
- if not ireq.name:
- raise self._build_failures[ireq.link]
- return UnsatisfiableRequirement(canonicalize_name(ireq.name))
- return self.make_requirement_from_candidate(cand)
- def collect_root_requirements(
- self, root_ireqs: List[InstallRequirement]
- ) -> CollectedRootRequirements:
- collected = CollectedRootRequirements([], {}, {})
- for i, ireq in enumerate(root_ireqs):
- if ireq.constraint:
- # Ensure we only accept valid constraints
- problem = check_invalid_constraint_type(ireq)
- if problem:
- raise InstallationError(problem)
- if not ireq.match_markers():
- continue
- assert ireq.name, "Constraint must be named"
- name = canonicalize_name(ireq.name)
- if name in collected.constraints:
- collected.constraints[name] &= ireq
- else:
- collected.constraints[name] = Constraint.from_ireq(ireq)
- else:
- req = self._make_requirement_from_install_req(
- ireq,
- requested_extras=(),
- )
- if req is None:
- continue
- if ireq.user_supplied and req.name not in collected.user_requested:
- collected.user_requested[req.name] = i
- collected.requirements.append(req)
- return collected
- def make_requirement_from_candidate(
- self, candidate: Candidate
- ) -> ExplicitRequirement:
- return ExplicitRequirement(candidate)
- def make_requirement_from_spec(
- self,
- specifier: str,
- comes_from: Optional[InstallRequirement],
- requested_extras: Iterable[str] = (),
- ) -> Optional[Requirement]:
- ireq = self._make_install_req_from_spec(specifier, comes_from)
- return self._make_requirement_from_install_req(ireq, requested_extras)
- def make_requires_python_requirement(
- self,
- specifier: SpecifierSet,
- ) -> Optional[Requirement]:
- if self._ignore_requires_python:
- return None
- # Don't bother creating a dependency for an empty Requires-Python.
- if not str(specifier):
- return None
- return RequiresPythonRequirement(specifier, self._python_candidate)
- def get_wheel_cache_entry(
- self, link: Link, name: Optional[str]
- ) -> Optional[CacheEntry]:
- """Look up the link in the wheel cache.
- If ``preparer.require_hashes`` is True, don't use the wheel cache,
- because cached wheels, always built locally, have different hashes
- than the files downloaded from the index server and thus throw false
- hash mismatches. Furthermore, cached wheels at present have
- nondeterministic contents due to file modification times.
- """
- if self._wheel_cache is None:
- return None
- return self._wheel_cache.get_cache_entry(
- link=link,
- package_name=name,
- supported_tags=get_supported(),
- )
- def get_dist_to_uninstall(self, candidate: Candidate) -> Optional[BaseDistribution]:
- # TODO: Are there more cases this needs to return True? Editable?
- dist = self._installed_dists.get(candidate.project_name)
- if dist is None: # Not installed, no uninstallation required.
- return None
- # We're installing into global site. The current installation must
- # be uninstalled, no matter it's in global or user site, because the
- # user site installation has precedence over global.
- if not self._use_user_site:
- return dist
- # We're installing into user site. Remove the user site installation.
- if dist.in_usersite:
- return dist
- # We're installing into user site, but the installed incompatible
- # package is in global site. We can't uninstall that, and would let
- # the new user installation to "shadow" it. But shadowing won't work
- # in virtual environments, so we error out.
- if running_under_virtualenv() and dist.in_site_packages:
- message = (
- f"Will not install to the user site because it will lack "
- f"sys.path precedence to {dist.raw_name} in {dist.location}"
- )
- raise InstallationError(message)
- return None
- def _report_requires_python_error(
- self, causes: Sequence["ConflictCause"]
- ) -> UnsupportedPythonVersion:
- assert causes, "Requires-Python error reported with no cause"
- version = self._python_candidate.version
- if len(causes) == 1:
- specifier = str(causes[0].requirement.specifier)
- message = (
- f"Package {causes[0].parent.name!r} requires a different "
- f"Python: {version} not in {specifier!r}"
- )
- return UnsupportedPythonVersion(message)
- message = f"Packages require a different Python. {version} not in:"
- for cause in causes:
- package = cause.parent.format_for_error()
- specifier = str(cause.requirement.specifier)
- message += f"\n{specifier!r} (required by {package})"
- return UnsupportedPythonVersion(message)
- def _report_single_requirement_conflict(
- self, req: Requirement, parent: Optional[Candidate]
- ) -> DistributionNotFound:
- if parent is None:
- req_disp = str(req)
- else:
- req_disp = f"{req} (from {parent.name})"
- cands = self._finder.find_all_candidates(req.project_name)
- skipped_by_requires_python = self._finder.requires_python_skipped_reasons()
- versions = [str(v) for v in sorted({c.version for c in cands})]
- if skipped_by_requires_python:
- logger.critical(
- "Ignored the following versions that require a different python "
- "version: %s",
- "; ".join(skipped_by_requires_python) or "none",
- )
- logger.critical(
- "Could not find a version that satisfies the requirement %s "
- "(from versions: %s)",
- req_disp,
- ", ".join(versions) or "none",
- )
- if str(req) == "requirements.txt":
- logger.info(
- "HINT: You are attempting to install a package literally "
- 'named "requirements.txt" (which cannot exist). Consider '
- "using the '-r' flag to install the packages listed in "
- "requirements.txt"
- )
- return DistributionNotFound(f"No matching distribution found for {req}")
- def get_installation_error(
- self,
- e: "ResolutionImpossible[Requirement, Candidate]",
- constraints: Dict[str, Constraint],
- ) -> InstallationError:
- assert e.causes, "Installation error reported with no cause"
- # If one of the things we can't solve is "we need Python X.Y",
- # that is what we report.
- requires_python_causes = [
- cause
- for cause in e.causes
- if isinstance(cause.requirement, RequiresPythonRequirement)
- and not cause.requirement.is_satisfied_by(self._python_candidate)
- ]
- if requires_python_causes:
- # The comprehension above makes sure all Requirement instances are
- # RequiresPythonRequirement, so let's cast for convenience.
- return self._report_requires_python_error(
- cast("Sequence[ConflictCause]", requires_python_causes),
- )
- # Otherwise, we have a set of causes which can't all be satisfied
- # at once.
- # The simplest case is when we have *one* cause that can't be
- # satisfied. We just report that case.
- if len(e.causes) == 1:
- req, parent = e.causes[0]
- if req.name not in constraints:
- return self._report_single_requirement_conflict(req, parent)
- # OK, we now have a list of requirements that can't all be
- # satisfied at once.
- # A couple of formatting helpers
- def text_join(parts: List[str]) -> str:
- if len(parts) == 1:
- return parts[0]
- return ", ".join(parts[:-1]) + " and " + parts[-1]
- def describe_trigger(parent: Candidate) -> str:
- ireq = parent.get_install_requirement()
- if not ireq or not ireq.comes_from:
- return f"{parent.name}=={parent.version}"
- if isinstance(ireq.comes_from, InstallRequirement):
- return str(ireq.comes_from.name)
- return str(ireq.comes_from)
- triggers = set()
- for req, parent in e.causes:
- if parent is None:
- # This is a root requirement, so we can report it directly
- trigger = req.format_for_error()
- else:
- trigger = describe_trigger(parent)
- triggers.add(trigger)
- if triggers:
- info = text_join(sorted(triggers))
- else:
- info = "the requested packages"
- msg = (
- "Cannot install {} because these package versions "
- "have conflicting dependencies.".format(info)
- )
- logger.critical(msg)
- msg = "\nThe conflict is caused by:"
- relevant_constraints = set()
- for req, parent in e.causes:
- if req.name in constraints:
- relevant_constraints.add(req.name)
- msg = msg + "\n "
- if parent:
- msg = msg + f"{parent.name} {parent.version} depends on "
- else:
- msg = msg + "The user requested "
- msg = msg + req.format_for_error()
- for key in relevant_constraints:
- spec = constraints[key].specifier
- msg += f"\n The user requested (constraint) {key}{spec}"
- msg = (
- msg
- + "\n\n"
- + "To fix this you could try to:\n"
- + "1. loosen the range of package versions you've specified\n"
- + "2. remove package versions to allow pip attempt to solve "
- + "the dependency conflict\n"
- )
- logger.info(msg)
- return DistributionNotFound(
- "ResolutionImpossible: for help visit "
- "https://pip.pypa.io/en/latest/topics/dependency-resolution/"
- "#dealing-with-dependency-conflicts"
- )
|