123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- import email.message
- import email.parser
- import logging
- import os
- import zipfile
- from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional
- from pip._vendor import pkg_resources
- from pip._vendor.packaging.requirements import Requirement
- from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
- from pip._vendor.packaging.version import parse as parse_version
- from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
- from pip._internal.utils.egg_link import egg_link_path_from_location
- from pip._internal.utils.misc import display_path, normalize_path
- from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
- from .base import (
- BaseDistribution,
- BaseEntryPoint,
- BaseEnvironment,
- DistributionVersion,
- InfoPath,
- Wheel,
- )
- logger = logging.getLogger(__name__)
- class EntryPoint(NamedTuple):
- name: str
- value: str
- group: str
- class InMemoryMetadata:
- """IMetadataProvider that reads metadata files from a dictionary.
- This also maps metadata decoding exceptions to our internal exception type.
- """
- def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None:
- self._metadata = metadata
- self._wheel_name = wheel_name
- def has_metadata(self, name: str) -> bool:
- return name in self._metadata
- def get_metadata(self, name: str) -> str:
- try:
- return self._metadata[name].decode()
- except UnicodeDecodeError as e:
- # Augment the default error with the origin of the file.
- raise UnsupportedWheel(
- f"Error decoding metadata for {self._wheel_name}: {e} in {name} file"
- )
- def get_metadata_lines(self, name: str) -> Iterable[str]:
- return pkg_resources.yield_lines(self.get_metadata(name))
- def metadata_isdir(self, name: str) -> bool:
- return False
- def metadata_listdir(self, name: str) -> List[str]:
- return []
- def run_script(self, script_name: str, namespace: str) -> None:
- pass
- class Distribution(BaseDistribution):
- def __init__(self, dist: pkg_resources.Distribution) -> None:
- self._dist = dist
- @classmethod
- def from_directory(cls, directory: str) -> BaseDistribution:
- dist_dir = directory.rstrip(os.sep)
- # Build a PathMetadata object, from path to metadata. :wink:
- base_dir, dist_dir_name = os.path.split(dist_dir)
- metadata = pkg_resources.PathMetadata(base_dir, dist_dir)
- # Determine the correct Distribution object type.
- if dist_dir.endswith(".egg-info"):
- dist_cls = pkg_resources.Distribution
- dist_name = os.path.splitext(dist_dir_name)[0]
- else:
- assert dist_dir.endswith(".dist-info")
- dist_cls = pkg_resources.DistInfoDistribution
- dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0]
- dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata)
- return cls(dist)
- @classmethod
- def from_metadata_file_contents(
- cls,
- metadata_contents: bytes,
- filename: str,
- project_name: str,
- ) -> BaseDistribution:
- metadata_dict = {
- "METADATA": metadata_contents,
- }
- dist = pkg_resources.DistInfoDistribution(
- location=filename,
- metadata=InMemoryMetadata(metadata_dict, filename),
- project_name=project_name,
- )
- return cls(dist)
- @classmethod
- def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
- try:
- with wheel.as_zipfile() as zf:
- info_dir, _ = parse_wheel(zf, name)
- metadata_dict = {
- path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path)
- for path in zf.namelist()
- if path.startswith(f"{info_dir}/")
- }
- except zipfile.BadZipFile as e:
- raise InvalidWheel(wheel.location, name) from e
- except UnsupportedWheel as e:
- raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
- dist = pkg_resources.DistInfoDistribution(
- location=wheel.location,
- metadata=InMemoryMetadata(metadata_dict, wheel.location),
- project_name=name,
- )
- return cls(dist)
- @property
- def location(self) -> Optional[str]:
- return self._dist.location
- @property
- def installed_location(self) -> Optional[str]:
- egg_link = egg_link_path_from_location(self.raw_name)
- if egg_link:
- location = egg_link
- elif self.location:
- location = self.location
- else:
- return None
- return normalize_path(location)
- @property
- def info_location(self) -> Optional[str]:
- return self._dist.egg_info
- @property
- def installed_by_distutils(self) -> bool:
- # A distutils-installed distribution is provided by FileMetadata. This
- # provider has a "path" attribute not present anywhere else. Not the
- # best introspection logic, but pip has been doing this for a long time.
- try:
- return bool(self._dist._provider.path)
- except AttributeError:
- return False
- @property
- def canonical_name(self) -> NormalizedName:
- return canonicalize_name(self._dist.project_name)
- @property
- def version(self) -> DistributionVersion:
- return parse_version(self._dist.version)
- def is_file(self, path: InfoPath) -> bool:
- return self._dist.has_metadata(str(path))
- def iter_distutils_script_names(self) -> Iterator[str]:
- yield from self._dist.metadata_listdir("scripts")
- def read_text(self, path: InfoPath) -> str:
- name = str(path)
- if not self._dist.has_metadata(name):
- raise FileNotFoundError(name)
- content = self._dist.get_metadata(name)
- if content is None:
- raise NoneMetadataError(self, name)
- return content
- def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
- for group, entries in self._dist.get_entry_map().items():
- for name, entry_point in entries.items():
- name, _, value = str(entry_point).partition("=")
- yield EntryPoint(name=name.strip(), value=value.strip(), group=group)
- def _metadata_impl(self) -> email.message.Message:
- """
- :raises NoneMetadataError: if the distribution reports `has_metadata()`
- True but `get_metadata()` returns None.
- """
- if isinstance(self._dist, pkg_resources.DistInfoDistribution):
- metadata_name = "METADATA"
- else:
- metadata_name = "PKG-INFO"
- try:
- metadata = self.read_text(metadata_name)
- except FileNotFoundError:
- if self.location:
- displaying_path = display_path(self.location)
- else:
- displaying_path = repr(self.location)
- logger.warning("No metadata found in %s", displaying_path)
- metadata = ""
- feed_parser = email.parser.FeedParser()
- feed_parser.feed(metadata)
- return feed_parser.close()
- def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
- if extras: # pkg_resources raises on invalid extras, so we sanitize.
- extras = frozenset(extras).intersection(self._dist.extras)
- return self._dist.requires(extras)
- def iter_provided_extras(self) -> Iterable[str]:
- return self._dist.extras
- class Environment(BaseEnvironment):
- def __init__(self, ws: pkg_resources.WorkingSet) -> None:
- self._ws = ws
- @classmethod
- def default(cls) -> BaseEnvironment:
- return cls(pkg_resources.working_set)
- @classmethod
- def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
- return cls(pkg_resources.WorkingSet(paths))
- def _iter_distributions(self) -> Iterator[BaseDistribution]:
- for dist in self._ws:
- yield Distribution(dist)
- def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
- """Find a distribution matching the ``name`` in the environment.
- This searches from *all* distributions available in the environment, to
- match the behavior of ``pkg_resources.get_distribution()``.
- """
- canonical_name = canonicalize_name(name)
- for dist in self.iter_all_distributions():
- if dist.canonical_name == canonical_name:
- return dist
- return None
- def get_distribution(self, name: str) -> Optional[BaseDistribution]:
- # Search the distribution by looking through the working set.
- dist = self._search_distribution(name)
- if dist:
- return dist
- # If distribution could not be found, call working_set.require to
- # update the working set, and try to find the distribution again.
- # This might happen for e.g. when you install a package twice, once
- # using setup.py develop and again using setup.py install. Now when
- # running pip uninstall twice, the package gets removed from the
- # working set in the first uninstall, so we have to populate the
- # working set again so that pip knows about it and the packages gets
- # picked up and is successfully uninstalled the second time too.
- try:
- # We didn't pass in any version specifiers, so this can never
- # raise pkg_resources.VersionConflict.
- self._ws.require(name)
- except pkg_resources.DistributionNotFound:
- return None
- return self._search_distribution(name)
|