123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import logging
- import shutil
- import sys
- import textwrap
- import xmlrpc.client
- from collections import OrderedDict
- from optparse import Values
- from typing import TYPE_CHECKING, Dict, List, Optional
- from pip._vendor.packaging.version import parse as parse_version
- from pip._internal.cli.base_command import Command
- from pip._internal.cli.req_command import SessionCommandMixin
- from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
- from pip._internal.exceptions import CommandError
- from pip._internal.metadata import get_default_environment
- from pip._internal.models.index import PyPI
- from pip._internal.network.xmlrpc import PipXmlrpcTransport
- from pip._internal.utils.logging import indent_log
- from pip._internal.utils.misc import write_output
- if TYPE_CHECKING:
- from typing import TypedDict
- class TransformedHit(TypedDict):
- name: str
- summary: str
- versions: List[str]
- logger = logging.getLogger(__name__)
- class SearchCommand(Command, SessionCommandMixin):
- """Search for PyPI packages whose name or summary contains <query>."""
- usage = """
- %prog [options] <query>"""
- ignore_require_venv = True
- def add_options(self) -> None:
- self.cmd_opts.add_option(
- "-i",
- "--index",
- dest="index",
- metavar="URL",
- default=PyPI.pypi_url,
- help="Base URL of Python Package Index (default %default)",
- )
- self.parser.insert_option_group(0, self.cmd_opts)
- def run(self, options: Values, args: List[str]) -> int:
- if not args:
- raise CommandError("Missing required argument (search query).")
- query = args
- pypi_hits = self.search(query, options)
- hits = transform_hits(pypi_hits)
- terminal_width = None
- if sys.stdout.isatty():
- terminal_width = shutil.get_terminal_size()[0]
- print_results(hits, terminal_width=terminal_width)
- if pypi_hits:
- return SUCCESS
- return NO_MATCHES_FOUND
- def search(self, query: List[str], options: Values) -> List[Dict[str, str]]:
- index_url = options.index
- session = self.get_default_session(options)
- transport = PipXmlrpcTransport(index_url, session)
- pypi = xmlrpc.client.ServerProxy(index_url, transport)
- try:
- hits = pypi.search({"name": query, "summary": query}, "or")
- except xmlrpc.client.Fault as fault:
- message = "XMLRPC request failed [code: {code}]\n{string}".format(
- code=fault.faultCode,
- string=fault.faultString,
- )
- raise CommandError(message)
- assert isinstance(hits, list)
- return hits
- def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]:
- """
- The list from pypi is really a list of versions. We want a list of
- packages with the list of versions stored inline. This converts the
- list from pypi into one we can use.
- """
- packages: Dict[str, "TransformedHit"] = OrderedDict()
- for hit in hits:
- name = hit["name"]
- summary = hit["summary"]
- version = hit["version"]
- if name not in packages.keys():
- packages[name] = {
- "name": name,
- "summary": summary,
- "versions": [version],
- }
- else:
- packages[name]["versions"].append(version)
- # if this is the highest version, replace summary and score
- if version == highest_version(packages[name]["versions"]):
- packages[name]["summary"] = summary
- return list(packages.values())
- def print_dist_installation_info(name: str, latest: str) -> None:
- env = get_default_environment()
- dist = env.get_distribution(name)
- if dist is not None:
- with indent_log():
- if dist.version == latest:
- write_output("INSTALLED: %s (latest)", dist.version)
- else:
- write_output("INSTALLED: %s", dist.version)
- if parse_version(latest).pre:
- write_output(
- "LATEST: %s (pre-release; install"
- " with `pip install --pre`)",
- latest,
- )
- else:
- write_output("LATEST: %s", latest)
- def print_results(
- hits: List["TransformedHit"],
- name_column_width: Optional[int] = None,
- terminal_width: Optional[int] = None,
- ) -> None:
- if not hits:
- return
- if name_column_width is None:
- name_column_width = (
- max(
- [
- len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
- for hit in hits
- ]
- )
- + 4
- )
- for hit in hits:
- name = hit["name"]
- summary = hit["summary"] or ""
- latest = highest_version(hit.get("versions", ["-"]))
- if terminal_width is not None:
- target_width = terminal_width - name_column_width - 5
- if target_width > 10:
- # wrap and indent summary to fit terminal
- summary_lines = textwrap.wrap(summary, target_width)
- summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines)
- name_latest = f"{name} ({latest})"
- line = f"{name_latest:{name_column_width}} - {summary}"
- try:
- write_output(line)
- print_dist_installation_info(name, latest)
- except UnicodeEncodeError:
- pass
- def highest_version(versions: List[str]) -> str:
- return max(versions, key=parse_version)
|