123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- import datetime
- import functools
- import hashlib
- import json
- import logging
- import optparse
- import os.path
- import sys
- from dataclasses import dataclass
- from typing import Any, Callable, Dict, Optional
- from pip._vendor.packaging.version import parse as parse_version
- from pip._vendor.rich.console import Group
- from pip._vendor.rich.markup import escape
- from pip._vendor.rich.text import Text
- from pip._internal.index.collector import LinkCollector
- from pip._internal.index.package_finder import PackageFinder
- from pip._internal.metadata import get_default_environment
- from pip._internal.metadata.base import DistributionVersion
- from pip._internal.models.selection_prefs import SelectionPreferences
- from pip._internal.network.session import PipSession
- from pip._internal.utils.compat import WINDOWS
- from pip._internal.utils.entrypoints import (
- get_best_invocation_for_this_pip,
- get_best_invocation_for_this_python,
- )
- from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace
- from pip._internal.utils.misc import ensure_dir
- _DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
- logger = logging.getLogger(__name__)
- def _get_statefile_name(key: str) -> str:
- key_bytes = key.encode()
- name = hashlib.sha224(key_bytes).hexdigest()
- return name
- class SelfCheckState:
- def __init__(self, cache_dir: str) -> None:
- self._state: Dict[str, Any] = {}
- self._statefile_path = None
- # Try to load the existing state
- if cache_dir:
- self._statefile_path = os.path.join(
- cache_dir, "selfcheck", _get_statefile_name(self.key)
- )
- try:
- with open(self._statefile_path, encoding="utf-8") as statefile:
- self._state = json.load(statefile)
- except (OSError, ValueError, KeyError):
- # Explicitly suppressing exceptions, since we don't want to
- # error out if the cache file is invalid.
- pass
- @property
- def key(self) -> str:
- return sys.prefix
- def get(self, current_time: datetime.datetime) -> Optional[str]:
- """Check if we have a not-outdated version loaded already."""
- if not self._state:
- return None
- if "last_check" not in self._state:
- return None
- if "pypi_version" not in self._state:
- return None
- seven_days_in_seconds = 7 * 24 * 60 * 60
- # Determine if we need to refresh the state
- last_check = datetime.datetime.strptime(self._state["last_check"], _DATE_FMT)
- seconds_since_last_check = (current_time - last_check).total_seconds()
- if seconds_since_last_check > seven_days_in_seconds:
- return None
- return self._state["pypi_version"]
- def set(self, pypi_version: str, current_time: datetime.datetime) -> None:
- # If we do not have a path to cache in, don't bother saving.
- if not self._statefile_path:
- return
- # Check to make sure that we own the directory
- if not check_path_owner(os.path.dirname(self._statefile_path)):
- return
- # Now that we've ensured the directory is owned by this user, we'll go
- # ahead and make sure that all our directories are created.
- ensure_dir(os.path.dirname(self._statefile_path))
- state = {
- # Include the key so it's easy to tell which pip wrote the
- # file.
- "key": self.key,
- "last_check": current_time.strftime(_DATE_FMT),
- "pypi_version": pypi_version,
- }
- text = json.dumps(state, sort_keys=True, separators=(",", ":"))
- with adjacent_tmp_file(self._statefile_path) as f:
- f.write(text.encode())
- try:
- # Since we have a prefix-specific state file, we can just
- # overwrite whatever is there, no need to check.
- replace(f.name, self._statefile_path)
- except OSError:
- # Best effort.
- pass
- @dataclass
- class UpgradePrompt:
- old: str
- new: str
- def __rich__(self) -> Group:
- if WINDOWS:
- pip_cmd = f"{get_best_invocation_for_this_python()} -m pip"
- else:
- pip_cmd = get_best_invocation_for_this_pip()
- notice = "[bold][[reset][blue]notice[reset][bold]][reset]"
- return Group(
- Text(),
- Text.from_markup(
- f"{notice} A new release of pip is available: "
- f"[red]{self.old}[reset] -> [green]{self.new}[reset]"
- ),
- Text.from_markup(
- f"{notice} To update, run: "
- f"[green]{escape(pip_cmd)} install --upgrade pip"
- ),
- )
- def was_installed_by_pip(pkg: str) -> bool:
- """Checks whether pkg was installed by pip
- This is used not to display the upgrade message when pip is in fact
- installed by system package manager, such as dnf on Fedora.
- """
- dist = get_default_environment().get_distribution(pkg)
- return dist is not None and "pip" == dist.installer
- def _get_current_remote_pip_version(
- session: PipSession, options: optparse.Values
- ) -> Optional[str]:
- # Lets use PackageFinder to see what the latest pip version is
- link_collector = LinkCollector.create(
- session,
- options=options,
- suppress_no_index=True,
- )
- # Pass allow_yanked=False so we don't suggest upgrading to a
- # yanked version.
- selection_prefs = SelectionPreferences(
- allow_yanked=False,
- allow_all_prereleases=False, # Explicitly set to False
- )
- finder = PackageFinder.create(
- link_collector=link_collector,
- selection_prefs=selection_prefs,
- )
- best_candidate = finder.find_best_candidate("pip").best_candidate
- if best_candidate is None:
- return None
- return str(best_candidate.version)
- def _self_version_check_logic(
- *,
- state: SelfCheckState,
- current_time: datetime.datetime,
- local_version: DistributionVersion,
- get_remote_version: Callable[[], Optional[str]],
- ) -> Optional[UpgradePrompt]:
- remote_version_str = state.get(current_time)
- if remote_version_str is None:
- remote_version_str = get_remote_version()
- if remote_version_str is None:
- logger.debug("No remote pip version found")
- return None
- state.set(remote_version_str, current_time)
- remote_version = parse_version(remote_version_str)
- logger.debug("Remote version of pip: %s", remote_version)
- logger.debug("Local version of pip: %s", local_version)
- pip_installed_by_pip = was_installed_by_pip("pip")
- logger.debug("Was pip installed by pip? %s", pip_installed_by_pip)
- if not pip_installed_by_pip:
- return None # Only suggest upgrade if pip is installed by pip.
- local_version_is_older = (
- local_version < remote_version
- and local_version.base_version != remote_version.base_version
- )
- if local_version_is_older:
- return UpgradePrompt(old=str(local_version), new=remote_version_str)
- return None
- def pip_self_version_check(session: PipSession, options: optparse.Values) -> None:
- """Check for an update for pip.
- Limit the frequency of checks to once per week. State is stored either in
- the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
- of the pip script path.
- """
- installed_dist = get_default_environment().get_distribution("pip")
- if not installed_dist:
- return
- try:
- upgrade_prompt = _self_version_check_logic(
- state=SelfCheckState(cache_dir=options.cache_dir),
- current_time=datetime.datetime.utcnow(),
- local_version=installed_dist.version,
- get_remote_version=functools.partial(
- _get_current_remote_pip_version, session, options
- ),
- )
- if upgrade_prompt is not None:
- logger.warning("[present-rich] %s", upgrade_prompt)
- except Exception:
- logger.warning("There was an error checking the latest version of pip.")
- logger.debug("See below for error", exc_info=True)
|