123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029 |
- """Routines related to PyPI, indexes"""
- import enum
- import functools
- import itertools
- import logging
- import re
- from typing import TYPE_CHECKING, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
- from pip._vendor.packaging import specifiers
- from pip._vendor.packaging.tags import Tag
- from pip._vendor.packaging.utils import canonicalize_name
- from pip._vendor.packaging.version import _BaseVersion
- from pip._vendor.packaging.version import parse as parse_version
- from pip._internal.exceptions import (
- BestVersionAlreadyInstalled,
- DistributionNotFound,
- InvalidWheelFilename,
- UnsupportedWheel,
- )
- from pip._internal.index.collector import LinkCollector, parse_links
- from pip._internal.models.candidate import InstallationCandidate
- from pip._internal.models.format_control import FormatControl
- from pip._internal.models.link import Link
- from pip._internal.models.search_scope import SearchScope
- from pip._internal.models.selection_prefs import SelectionPreferences
- from pip._internal.models.target_python import TargetPython
- from pip._internal.models.wheel import Wheel
- from pip._internal.req import InstallRequirement
- from pip._internal.utils._log import getLogger
- from pip._internal.utils.filetypes import WHEEL_EXTENSION
- from pip._internal.utils.hashes import Hashes
- from pip._internal.utils.logging import indent_log
- from pip._internal.utils.misc import build_netloc
- from pip._internal.utils.packaging import check_requires_python
- from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS
- if TYPE_CHECKING:
- from pip._vendor.typing_extensions import TypeGuard
- __all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"]
- logger = getLogger(__name__)
- BuildTag = Union[Tuple[()], Tuple[int, str]]
- CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag]
- def _check_link_requires_python(
- link: Link,
- version_info: Tuple[int, int, int],
- ignore_requires_python: bool = False,
- ) -> bool:
- """
- Return whether the given Python version is compatible with a link's
- "Requires-Python" value.
- :param version_info: A 3-tuple of ints representing the Python
- major-minor-micro version to check.
- :param ignore_requires_python: Whether to ignore the "Requires-Python"
- value if the given Python version isn't compatible.
- """
- try:
- is_compatible = check_requires_python(
- link.requires_python,
- version_info=version_info,
- )
- except specifiers.InvalidSpecifier:
- logger.debug(
- "Ignoring invalid Requires-Python (%r) for link: %s",
- link.requires_python,
- link,
- )
- else:
- if not is_compatible:
- version = ".".join(map(str, version_info))
- if not ignore_requires_python:
- logger.verbose(
- "Link requires a different Python (%s not in: %r): %s",
- version,
- link.requires_python,
- link,
- )
- return False
- logger.debug(
- "Ignoring failed Requires-Python check (%s not in: %r) for link: %s",
- version,
- link.requires_python,
- link,
- )
- return True
- class LinkType(enum.Enum):
- candidate = enum.auto()
- different_project = enum.auto()
- yanked = enum.auto()
- format_unsupported = enum.auto()
- format_invalid = enum.auto()
- platform_mismatch = enum.auto()
- requires_python_mismatch = enum.auto()
- class LinkEvaluator:
- """
- Responsible for evaluating links for a particular project.
- """
- _py_version_re = re.compile(r"-py([123]\.?[0-9]?)$")
- # Don't include an allow_yanked default value to make sure each call
- # site considers whether yanked releases are allowed. This also causes
- # that decision to be made explicit in the calling code, which helps
- # people when reading the code.
- def __init__(
- self,
- project_name: str,
- canonical_name: str,
- formats: FrozenSet[str],
- target_python: TargetPython,
- allow_yanked: bool,
- ignore_requires_python: Optional[bool] = None,
- ) -> None:
- """
- :param project_name: The user supplied package name.
- :param canonical_name: The canonical package name.
- :param formats: The formats allowed for this package. Should be a set
- with 'binary' or 'source' or both in it.
- :param target_python: The target Python interpreter to use when
- evaluating link compatibility. This is used, for example, to
- check wheel compatibility, as well as when checking the Python
- version, e.g. the Python version embedded in a link filename
- (or egg fragment) and against an HTML link's optional PEP 503
- "data-requires-python" attribute.
- :param allow_yanked: Whether files marked as yanked (in the sense
- of PEP 592) are permitted to be candidates for install.
- :param ignore_requires_python: Whether to ignore incompatible
- PEP 503 "data-requires-python" values in HTML links. Defaults
- to False.
- """
- if ignore_requires_python is None:
- ignore_requires_python = False
- self._allow_yanked = allow_yanked
- self._canonical_name = canonical_name
- self._ignore_requires_python = ignore_requires_python
- self._formats = formats
- self._target_python = target_python
- self.project_name = project_name
- def evaluate_link(self, link: Link) -> Tuple[LinkType, str]:
- """
- Determine whether a link is a candidate for installation.
- :return: A tuple (result, detail), where *result* is an enum
- representing whether the evaluation found a candidate, or the reason
- why one is not found. If a candidate is found, *detail* will be the
- candidate's version string; if one is not found, it contains the
- reason the link fails to qualify.
- """
- version = None
- if link.is_yanked and not self._allow_yanked:
- reason = link.yanked_reason or "<none given>"
- return (LinkType.yanked, f"yanked for reason: {reason}")
- if link.egg_fragment:
- egg_info = link.egg_fragment
- ext = link.ext
- else:
- egg_info, ext = link.splitext()
- if not ext:
- return (LinkType.format_unsupported, "not a file")
- if ext not in SUPPORTED_EXTENSIONS:
- return (
- LinkType.format_unsupported,
- f"unsupported archive format: {ext}",
- )
- if "binary" not in self._formats and ext == WHEEL_EXTENSION:
- reason = f"No binaries permitted for {self.project_name}"
- return (LinkType.format_unsupported, reason)
- if "macosx10" in link.path and ext == ".zip":
- return (LinkType.format_unsupported, "macosx10 one")
- if ext == WHEEL_EXTENSION:
- try:
- wheel = Wheel(link.filename)
- except InvalidWheelFilename:
- return (
- LinkType.format_invalid,
- "invalid wheel filename",
- )
- if canonicalize_name(wheel.name) != self._canonical_name:
- reason = f"wrong project name (not {self.project_name})"
- return (LinkType.different_project, reason)
- supported_tags = self._target_python.get_tags()
- if not wheel.supported(supported_tags):
- # Include the wheel's tags in the reason string to
- # simplify troubleshooting compatibility issues.
- file_tags = ", ".join(wheel.get_formatted_file_tags())
- reason = (
- f"none of the wheel's tags ({file_tags}) are compatible "
- f"(run pip debug --verbose to show compatible tags)"
- )
- return (LinkType.platform_mismatch, reason)
- version = wheel.version
- # This should be up by the self.ok_binary check, but see issue 2700.
- if "source" not in self._formats and ext != WHEEL_EXTENSION:
- reason = f"No sources permitted for {self.project_name}"
- return (LinkType.format_unsupported, reason)
- if not version:
- version = _extract_version_from_fragment(
- egg_info,
- self._canonical_name,
- )
- if not version:
- reason = f"Missing project version for {self.project_name}"
- return (LinkType.format_invalid, reason)
- match = self._py_version_re.search(version)
- if match:
- version = version[: match.start()]
- py_version = match.group(1)
- if py_version != self._target_python.py_version:
- return (
- LinkType.platform_mismatch,
- "Python version is incorrect",
- )
- supports_python = _check_link_requires_python(
- link,
- version_info=self._target_python.py_version_info,
- ignore_requires_python=self._ignore_requires_python,
- )
- if not supports_python:
- reason = f"{version} Requires-Python {link.requires_python}"
- return (LinkType.requires_python_mismatch, reason)
- logger.debug("Found link %s, version: %s", link, version)
- return (LinkType.candidate, version)
- def filter_unallowed_hashes(
- candidates: List[InstallationCandidate],
- hashes: Optional[Hashes],
- project_name: str,
- ) -> List[InstallationCandidate]:
- """
- Filter out candidates whose hashes aren't allowed, and return a new
- list of candidates.
- If at least one candidate has an allowed hash, then all candidates with
- either an allowed hash or no hash specified are returned. Otherwise,
- the given candidates are returned.
- Including the candidates with no hash specified when there is a match
- allows a warning to be logged if there is a more preferred candidate
- with no hash specified. Returning all candidates in the case of no
- matches lets pip report the hash of the candidate that would otherwise
- have been installed (e.g. permitting the user to more easily update
- their requirements file with the desired hash).
- """
- if not hashes:
- logger.debug(
- "Given no hashes to check %s links for project %r: "
- "discarding no candidates",
- len(candidates),
- project_name,
- )
- # Make sure we're not returning back the given value.
- return list(candidates)
- matches_or_no_digest = []
- # Collect the non-matches for logging purposes.
- non_matches = []
- match_count = 0
- for candidate in candidates:
- link = candidate.link
- if not link.has_hash:
- pass
- elif link.is_hash_allowed(hashes=hashes):
- match_count += 1
- else:
- non_matches.append(candidate)
- continue
- matches_or_no_digest.append(candidate)
- if match_count:
- filtered = matches_or_no_digest
- else:
- # Make sure we're not returning back the given value.
- filtered = list(candidates)
- if len(filtered) == len(candidates):
- discard_message = "discarding no candidates"
- else:
- discard_message = "discarding {} non-matches:\n {}".format(
- len(non_matches),
- "\n ".join(str(candidate.link) for candidate in non_matches),
- )
- logger.debug(
- "Checked %s links for project %r against %s hashes "
- "(%s matches, %s no digest): %s",
- len(candidates),
- project_name,
- hashes.digest_count,
- match_count,
- len(matches_or_no_digest) - match_count,
- discard_message,
- )
- return filtered
- class CandidatePreferences:
- """
- Encapsulates some of the preferences for filtering and sorting
- InstallationCandidate objects.
- """
- def __init__(
- self,
- prefer_binary: bool = False,
- allow_all_prereleases: bool = False,
- ) -> None:
- """
- :param allow_all_prereleases: Whether to allow all pre-releases.
- """
- self.allow_all_prereleases = allow_all_prereleases
- self.prefer_binary = prefer_binary
- class BestCandidateResult:
- """A collection of candidates, returned by `PackageFinder.find_best_candidate`.
- This class is only intended to be instantiated by CandidateEvaluator's
- `compute_best_candidate()` method.
- """
- def __init__(
- self,
- candidates: List[InstallationCandidate],
- applicable_candidates: List[InstallationCandidate],
- best_candidate: Optional[InstallationCandidate],
- ) -> None:
- """
- :param candidates: A sequence of all available candidates found.
- :param applicable_candidates: The applicable candidates.
- :param best_candidate: The most preferred candidate found, or None
- if no applicable candidates were found.
- """
- assert set(applicable_candidates) <= set(candidates)
- if best_candidate is None:
- assert not applicable_candidates
- else:
- assert best_candidate in applicable_candidates
- self._applicable_candidates = applicable_candidates
- self._candidates = candidates
- self.best_candidate = best_candidate
- def iter_all(self) -> Iterable[InstallationCandidate]:
- """Iterate through all candidates."""
- return iter(self._candidates)
- def iter_applicable(self) -> Iterable[InstallationCandidate]:
- """Iterate through the applicable candidates."""
- return iter(self._applicable_candidates)
- class CandidateEvaluator:
- """
- Responsible for filtering and sorting candidates for installation based
- on what tags are valid.
- """
- @classmethod
- def create(
- cls,
- project_name: str,
- target_python: Optional[TargetPython] = None,
- prefer_binary: bool = False,
- allow_all_prereleases: bool = False,
- specifier: Optional[specifiers.BaseSpecifier] = None,
- hashes: Optional[Hashes] = None,
- ) -> "CandidateEvaluator":
- """Create a CandidateEvaluator object.
- :param target_python: The target Python interpreter to use when
- checking compatibility. If None (the default), a TargetPython
- object will be constructed from the running Python.
- :param specifier: An optional object implementing `filter`
- (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
- versions.
- :param hashes: An optional collection of allowed hashes.
- """
- if target_python is None:
- target_python = TargetPython()
- if specifier is None:
- specifier = specifiers.SpecifierSet()
- supported_tags = target_python.get_tags()
- return cls(
- project_name=project_name,
- supported_tags=supported_tags,
- specifier=specifier,
- prefer_binary=prefer_binary,
- allow_all_prereleases=allow_all_prereleases,
- hashes=hashes,
- )
- def __init__(
- self,
- project_name: str,
- supported_tags: List[Tag],
- specifier: specifiers.BaseSpecifier,
- prefer_binary: bool = False,
- allow_all_prereleases: bool = False,
- hashes: Optional[Hashes] = None,
- ) -> None:
- """
- :param supported_tags: The PEP 425 tags supported by the target
- Python in order of preference (most preferred first).
- """
- self._allow_all_prereleases = allow_all_prereleases
- self._hashes = hashes
- self._prefer_binary = prefer_binary
- self._project_name = project_name
- self._specifier = specifier
- self._supported_tags = supported_tags
- # Since the index of the tag in the _supported_tags list is used
- # as a priority, precompute a map from tag to index/priority to be
- # used in wheel.find_most_preferred_tag.
- self._wheel_tag_preferences = {
- tag: idx for idx, tag in enumerate(supported_tags)
- }
- def get_applicable_candidates(
- self,
- candidates: List[InstallationCandidate],
- ) -> List[InstallationCandidate]:
- """
- Return the applicable candidates from a list of candidates.
- """
- # Using None infers from the specifier instead.
- allow_prereleases = self._allow_all_prereleases or None
- specifier = self._specifier
- versions = {
- str(v)
- for v in specifier.filter(
- # We turn the version object into a str here because otherwise
- # when we're debundled but setuptools isn't, Python will see
- # packaging.version.Version and
- # pkg_resources._vendor.packaging.version.Version as different
- # types. This way we'll use a str as a common data interchange
- # format. If we stop using the pkg_resources provided specifier
- # and start using our own, we can drop the cast to str().
- (str(c.version) for c in candidates),
- prereleases=allow_prereleases,
- )
- }
- # Again, converting version to str to deal with debundling.
- applicable_candidates = [c for c in candidates if str(c.version) in versions]
- filtered_applicable_candidates = filter_unallowed_hashes(
- candidates=applicable_candidates,
- hashes=self._hashes,
- project_name=self._project_name,
- )
- return sorted(filtered_applicable_candidates, key=self._sort_key)
- def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey:
- """
- Function to pass as the `key` argument to a call to sorted() to sort
- InstallationCandidates by preference.
- Returns a tuple such that tuples sorting as greater using Python's
- default comparison operator are more preferred.
- The preference is as follows:
- First and foremost, candidates with allowed (matching) hashes are
- always preferred over candidates without matching hashes. This is
- because e.g. if the only candidate with an allowed hash is yanked,
- we still want to use that candidate.
- Second, excepting hash considerations, candidates that have been
- yanked (in the sense of PEP 592) are always less preferred than
- candidates that haven't been yanked. Then:
- If not finding wheels, they are sorted by version only.
- If finding wheels, then the sort order is by version, then:
- 1. existing installs
- 2. wheels ordered via Wheel.support_index_min(self._supported_tags)
- 3. source archives
- If prefer_binary was set, then all wheels are sorted above sources.
- Note: it was considered to embed this logic into the Link
- comparison operators, but then different sdist links
- with the same version, would have to be considered equal
- """
- valid_tags = self._supported_tags
- support_num = len(valid_tags)
- build_tag: BuildTag = ()
- binary_preference = 0
- link = candidate.link
- if link.is_wheel:
- # can raise InvalidWheelFilename
- wheel = Wheel(link.filename)
- try:
- pri = -(
- wheel.find_most_preferred_tag(
- valid_tags, self._wheel_tag_preferences
- )
- )
- except ValueError:
- raise UnsupportedWheel(
- "{} is not a supported wheel for this platform. It "
- "can't be sorted.".format(wheel.filename)
- )
- if self._prefer_binary:
- binary_preference = 1
- if wheel.build_tag is not None:
- match = re.match(r"^(\d+)(.*)$", wheel.build_tag)
- assert match is not None, "guaranteed by filename validation"
- build_tag_groups = match.groups()
- build_tag = (int(build_tag_groups[0]), build_tag_groups[1])
- else: # sdist
- pri = -(support_num)
- has_allowed_hash = int(link.is_hash_allowed(self._hashes))
- yank_value = -1 * int(link.is_yanked) # -1 for yanked.
- return (
- has_allowed_hash,
- yank_value,
- binary_preference,
- candidate.version,
- pri,
- build_tag,
- )
- def sort_best_candidate(
- self,
- candidates: List[InstallationCandidate],
- ) -> Optional[InstallationCandidate]:
- """
- Return the best candidate per the instance's sort order, or None if
- no candidate is acceptable.
- """
- if not candidates:
- return None
- best_candidate = max(candidates, key=self._sort_key)
- return best_candidate
- def compute_best_candidate(
- self,
- candidates: List[InstallationCandidate],
- ) -> BestCandidateResult:
- """
- Compute and return a `BestCandidateResult` instance.
- """
- applicable_candidates = self.get_applicable_candidates(candidates)
- best_candidate = self.sort_best_candidate(applicable_candidates)
- return BestCandidateResult(
- candidates,
- applicable_candidates=applicable_candidates,
- best_candidate=best_candidate,
- )
- class PackageFinder:
- """This finds packages.
- This is meant to match easy_install's technique for looking for
- packages, by reading pages and looking for appropriate links.
- """
- def __init__(
- self,
- link_collector: LinkCollector,
- target_python: TargetPython,
- allow_yanked: bool,
- format_control: Optional[FormatControl] = None,
- candidate_prefs: Optional[CandidatePreferences] = None,
- ignore_requires_python: Optional[bool] = None,
- ) -> None:
- """
- This constructor is primarily meant to be used by the create() class
- method and from tests.
- :param format_control: A FormatControl object, used to control
- the selection of source packages / binary packages when consulting
- the index and links.
- :param candidate_prefs: Options to use when creating a
- CandidateEvaluator object.
- """
- if candidate_prefs is None:
- candidate_prefs = CandidatePreferences()
- format_control = format_control or FormatControl(set(), set())
- self._allow_yanked = allow_yanked
- self._candidate_prefs = candidate_prefs
- self._ignore_requires_python = ignore_requires_python
- self._link_collector = link_collector
- self._target_python = target_python
- self.format_control = format_control
- # These are boring links that have already been logged somehow.
- self._logged_links: Set[Tuple[Link, LinkType, str]] = set()
- # Don't include an allow_yanked default value to make sure each call
- # site considers whether yanked releases are allowed. This also causes
- # that decision to be made explicit in the calling code, which helps
- # people when reading the code.
- @classmethod
- def create(
- cls,
- link_collector: LinkCollector,
- selection_prefs: SelectionPreferences,
- target_python: Optional[TargetPython] = None,
- ) -> "PackageFinder":
- """Create a PackageFinder.
- :param selection_prefs: The candidate selection preferences, as a
- SelectionPreferences object.
- :param target_python: The target Python interpreter to use when
- checking compatibility. If None (the default), a TargetPython
- object will be constructed from the running Python.
- """
- if target_python is None:
- target_python = TargetPython()
- candidate_prefs = CandidatePreferences(
- prefer_binary=selection_prefs.prefer_binary,
- allow_all_prereleases=selection_prefs.allow_all_prereleases,
- )
- return cls(
- candidate_prefs=candidate_prefs,
- link_collector=link_collector,
- target_python=target_python,
- allow_yanked=selection_prefs.allow_yanked,
- format_control=selection_prefs.format_control,
- ignore_requires_python=selection_prefs.ignore_requires_python,
- )
- @property
- def target_python(self) -> TargetPython:
- return self._target_python
- @property
- def search_scope(self) -> SearchScope:
- return self._link_collector.search_scope
- @search_scope.setter
- def search_scope(self, search_scope: SearchScope) -> None:
- self._link_collector.search_scope = search_scope
- @property
- def find_links(self) -> List[str]:
- return self._link_collector.find_links
- @property
- def index_urls(self) -> List[str]:
- return self.search_scope.index_urls
- @property
- def trusted_hosts(self) -> Iterable[str]:
- for host_port in self._link_collector.session.pip_trusted_origins:
- yield build_netloc(*host_port)
- @property
- def allow_all_prereleases(self) -> bool:
- return self._candidate_prefs.allow_all_prereleases
- def set_allow_all_prereleases(self) -> None:
- self._candidate_prefs.allow_all_prereleases = True
- @property
- def prefer_binary(self) -> bool:
- return self._candidate_prefs.prefer_binary
- def set_prefer_binary(self) -> None:
- self._candidate_prefs.prefer_binary = True
- def requires_python_skipped_reasons(self) -> List[str]:
- reasons = {
- detail
- for _, result, detail in self._logged_links
- if result == LinkType.requires_python_mismatch
- }
- return sorted(reasons)
- def make_link_evaluator(self, project_name: str) -> LinkEvaluator:
- canonical_name = canonicalize_name(project_name)
- formats = self.format_control.get_allowed_formats(canonical_name)
- return LinkEvaluator(
- project_name=project_name,
- canonical_name=canonical_name,
- formats=formats,
- target_python=self._target_python,
- allow_yanked=self._allow_yanked,
- ignore_requires_python=self._ignore_requires_python,
- )
- def _sort_links(self, links: Iterable[Link]) -> List[Link]:
- """
- Returns elements of links in order, non-egg links first, egg links
- second, while eliminating duplicates
- """
- eggs, no_eggs = [], []
- seen: Set[Link] = set()
- for link in links:
- if link not in seen:
- seen.add(link)
- if link.egg_fragment:
- eggs.append(link)
- else:
- no_eggs.append(link)
- return no_eggs + eggs
- def _log_skipped_link(self, link: Link, result: LinkType, detail: str) -> None:
- entry = (link, result, detail)
- if entry not in self._logged_links:
- # Put the link at the end so the reason is more visible and because
- # the link string is usually very long.
- logger.debug("Skipping link: %s: %s", detail, link)
- self._logged_links.add(entry)
- def get_install_candidate(
- self, link_evaluator: LinkEvaluator, link: Link
- ) -> Optional[InstallationCandidate]:
- """
- If the link is a candidate for install, convert it to an
- InstallationCandidate and return it. Otherwise, return None.
- """
- result, detail = link_evaluator.evaluate_link(link)
- if result != LinkType.candidate:
- self._log_skipped_link(link, result, detail)
- return None
- return InstallationCandidate(
- name=link_evaluator.project_name,
- link=link,
- version=detail,
- )
- def evaluate_links(
- self, link_evaluator: LinkEvaluator, links: Iterable[Link]
- ) -> List[InstallationCandidate]:
- """
- Convert links that are candidates to InstallationCandidate objects.
- """
- candidates = []
- for link in self._sort_links(links):
- candidate = self.get_install_candidate(link_evaluator, link)
- if candidate is not None:
- candidates.append(candidate)
- return candidates
- def process_project_url(
- self, project_url: Link, link_evaluator: LinkEvaluator
- ) -> List[InstallationCandidate]:
- logger.debug(
- "Fetching project page and analyzing links: %s",
- project_url,
- )
- index_response = self._link_collector.fetch_response(project_url)
- if index_response is None:
- return []
- page_links = list(parse_links(index_response))
- with indent_log():
- package_links = self.evaluate_links(
- link_evaluator,
- links=page_links,
- )
- return package_links
- @functools.lru_cache(maxsize=None)
- def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]:
- """Find all available InstallationCandidate for project_name
- This checks index_urls and find_links.
- All versions found are returned as an InstallationCandidate list.
- See LinkEvaluator.evaluate_link() for details on which files
- are accepted.
- """
- link_evaluator = self.make_link_evaluator(project_name)
- collected_sources = self._link_collector.collect_sources(
- project_name=project_name,
- candidates_from_page=functools.partial(
- self.process_project_url,
- link_evaluator=link_evaluator,
- ),
- )
- page_candidates_it = itertools.chain.from_iterable(
- source.page_candidates()
- for sources in collected_sources
- for source in sources
- if source is not None
- )
- page_candidates = list(page_candidates_it)
- file_links_it = itertools.chain.from_iterable(
- source.file_links()
- for sources in collected_sources
- for source in sources
- if source is not None
- )
- file_candidates = self.evaluate_links(
- link_evaluator,
- sorted(file_links_it, reverse=True),
- )
- if logger.isEnabledFor(logging.DEBUG) and file_candidates:
- paths = []
- for candidate in file_candidates:
- assert candidate.link.url # we need to have a URL
- try:
- paths.append(candidate.link.file_path)
- except Exception:
- paths.append(candidate.link.url) # it's not a local file
- logger.debug("Local files found: %s", ", ".join(paths))
- # This is an intentional priority ordering
- return file_candidates + page_candidates
- def make_candidate_evaluator(
- self,
- project_name: str,
- specifier: Optional[specifiers.BaseSpecifier] = None,
- hashes: Optional[Hashes] = None,
- ) -> CandidateEvaluator:
- """Create a CandidateEvaluator object to use."""
- candidate_prefs = self._candidate_prefs
- return CandidateEvaluator.create(
- project_name=project_name,
- target_python=self._target_python,
- prefer_binary=candidate_prefs.prefer_binary,
- allow_all_prereleases=candidate_prefs.allow_all_prereleases,
- specifier=specifier,
- hashes=hashes,
- )
- @functools.lru_cache(maxsize=None)
- def find_best_candidate(
- self,
- project_name: str,
- specifier: Optional[specifiers.BaseSpecifier] = None,
- hashes: Optional[Hashes] = None,
- ) -> BestCandidateResult:
- """Find matches for the given project and specifier.
- :param specifier: An optional object implementing `filter`
- (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
- versions.
- :return: A `BestCandidateResult` instance.
- """
- candidates = self.find_all_candidates(project_name)
- candidate_evaluator = self.make_candidate_evaluator(
- project_name=project_name,
- specifier=specifier,
- hashes=hashes,
- )
- return candidate_evaluator.compute_best_candidate(candidates)
- def find_requirement(
- self, req: InstallRequirement, upgrade: bool
- ) -> Optional[InstallationCandidate]:
- """Try to find a Link matching req
- Expects req, an InstallRequirement and upgrade, a boolean
- Returns a InstallationCandidate if found,
- Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
- """
- hashes = req.hashes(trust_internet=False)
- best_candidate_result = self.find_best_candidate(
- req.name,
- specifier=req.specifier,
- hashes=hashes,
- )
- best_candidate = best_candidate_result.best_candidate
- installed_version: Optional[_BaseVersion] = None
- if req.satisfied_by is not None:
- installed_version = req.satisfied_by.version
- def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str:
- # This repeated parse_version and str() conversion is needed to
- # handle different vendoring sources from pip and pkg_resources.
- # If we stop using the pkg_resources provided specifier and start
- # using our own, we can drop the cast to str().
- return (
- ", ".join(
- sorted(
- {str(c.version) for c in cand_iter},
- key=parse_version,
- )
- )
- or "none"
- )
- if installed_version is None and best_candidate is None:
- logger.critical(
- "Could not find a version that satisfies the requirement %s "
- "(from versions: %s)",
- req,
- _format_versions(best_candidate_result.iter_all()),
- )
- raise DistributionNotFound(
- "No matching distribution found for {}".format(req)
- )
- def _should_install_candidate(
- candidate: Optional[InstallationCandidate],
- ) -> "TypeGuard[InstallationCandidate]":
- if installed_version is None:
- return True
- if best_candidate is None:
- return False
- return best_candidate.version > installed_version
- if not upgrade and installed_version is not None:
- if _should_install_candidate(best_candidate):
- logger.debug(
- "Existing installed version (%s) satisfies requirement "
- "(most up-to-date version is %s)",
- installed_version,
- best_candidate.version,
- )
- else:
- logger.debug(
- "Existing installed version (%s) is most up-to-date and "
- "satisfies requirement",
- installed_version,
- )
- return None
- if _should_install_candidate(best_candidate):
- logger.debug(
- "Using version %s (newest of versions: %s)",
- best_candidate.version,
- _format_versions(best_candidate_result.iter_applicable()),
- )
- return best_candidate
- # We have an existing version, and its the best version
- logger.debug(
- "Installed version (%s) is most up-to-date (past versions: %s)",
- installed_version,
- _format_versions(best_candidate_result.iter_applicable()),
- )
- raise BestVersionAlreadyInstalled
- def _find_name_version_sep(fragment: str, canonical_name: str) -> int:
- """Find the separator's index based on the package's canonical name.
- :param fragment: A <package>+<version> filename "fragment" (stem) or
- egg fragment.
- :param canonical_name: The package's canonical name.
- This function is needed since the canonicalized name does not necessarily
- have the same length as the egg info's name part. An example::
- >>> fragment = 'foo__bar-1.0'
- >>> canonical_name = 'foo-bar'
- >>> _find_name_version_sep(fragment, canonical_name)
- 8
- """
- # Project name and version must be separated by one single dash. Find all
- # occurrences of dashes; if the string in front of it matches the canonical
- # name, this is the one separating the name and version parts.
- for i, c in enumerate(fragment):
- if c != "-":
- continue
- if canonicalize_name(fragment[:i]) == canonical_name:
- return i
- raise ValueError(f"{fragment} does not match {canonical_name}")
- def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]:
- """Parse the version string from a <package>+<version> filename
- "fragment" (stem) or egg fragment.
- :param fragment: The string to parse. E.g. foo-2.1
- :param canonical_name: The canonicalized name of the package this
- belongs to.
- """
- try:
- version_start = _find_name_version_sep(fragment, canonical_name) + 1
- except ValueError:
- return None
- version = fragment[version_start:]
- if not version:
- return None
- return version
|