search.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import logging
  2. import shutil
  3. import sys
  4. import textwrap
  5. import xmlrpc.client
  6. from collections import OrderedDict
  7. from optparse import Values
  8. from typing import TYPE_CHECKING, Dict, List, Optional
  9. from pip._vendor.packaging.version import parse as parse_version
  10. from pip._internal.cli.base_command import Command
  11. from pip._internal.cli.req_command import SessionCommandMixin
  12. from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
  13. from pip._internal.exceptions import CommandError
  14. from pip._internal.metadata import get_default_environment
  15. from pip._internal.models.index import PyPI
  16. from pip._internal.network.xmlrpc import PipXmlrpcTransport
  17. from pip._internal.utils.logging import indent_log
  18. from pip._internal.utils.misc import write_output
  19. if TYPE_CHECKING:
  20. from typing import TypedDict
  21. class TransformedHit(TypedDict):
  22. name: str
  23. summary: str
  24. versions: List[str]
  25. logger = logging.getLogger(__name__)
  26. class SearchCommand(Command, SessionCommandMixin):
  27. """Search for PyPI packages whose name or summary contains <query>."""
  28. usage = """
  29. %prog [options] <query>"""
  30. ignore_require_venv = True
  31. def add_options(self) -> None:
  32. self.cmd_opts.add_option(
  33. "-i",
  34. "--index",
  35. dest="index",
  36. metavar="URL",
  37. default=PyPI.pypi_url,
  38. help="Base URL of Python Package Index (default %default)",
  39. )
  40. self.parser.insert_option_group(0, self.cmd_opts)
  41. def run(self, options: Values, args: List[str]) -> int:
  42. if not args:
  43. raise CommandError("Missing required argument (search query).")
  44. query = args
  45. pypi_hits = self.search(query, options)
  46. hits = transform_hits(pypi_hits)
  47. terminal_width = None
  48. if sys.stdout.isatty():
  49. terminal_width = shutil.get_terminal_size()[0]
  50. print_results(hits, terminal_width=terminal_width)
  51. if pypi_hits:
  52. return SUCCESS
  53. return NO_MATCHES_FOUND
  54. def search(self, query: List[str], options: Values) -> List[Dict[str, str]]:
  55. index_url = options.index
  56. session = self.get_default_session(options)
  57. transport = PipXmlrpcTransport(index_url, session)
  58. pypi = xmlrpc.client.ServerProxy(index_url, transport)
  59. try:
  60. hits = pypi.search({"name": query, "summary": query}, "or")
  61. except xmlrpc.client.Fault as fault:
  62. message = "XMLRPC request failed [code: {code}]\n{string}".format(
  63. code=fault.faultCode,
  64. string=fault.faultString,
  65. )
  66. raise CommandError(message)
  67. assert isinstance(hits, list)
  68. return hits
  69. def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]:
  70. """
  71. The list from pypi is really a list of versions. We want a list of
  72. packages with the list of versions stored inline. This converts the
  73. list from pypi into one we can use.
  74. """
  75. packages: Dict[str, "TransformedHit"] = OrderedDict()
  76. for hit in hits:
  77. name = hit["name"]
  78. summary = hit["summary"]
  79. version = hit["version"]
  80. if name not in packages.keys():
  81. packages[name] = {
  82. "name": name,
  83. "summary": summary,
  84. "versions": [version],
  85. }
  86. else:
  87. packages[name]["versions"].append(version)
  88. # if this is the highest version, replace summary and score
  89. if version == highest_version(packages[name]["versions"]):
  90. packages[name]["summary"] = summary
  91. return list(packages.values())
  92. def print_dist_installation_info(name: str, latest: str) -> None:
  93. env = get_default_environment()
  94. dist = env.get_distribution(name)
  95. if dist is not None:
  96. with indent_log():
  97. if dist.version == latest:
  98. write_output("INSTALLED: %s (latest)", dist.version)
  99. else:
  100. write_output("INSTALLED: %s", dist.version)
  101. if parse_version(latest).pre:
  102. write_output(
  103. "LATEST: %s (pre-release; install"
  104. " with `pip install --pre`)",
  105. latest,
  106. )
  107. else:
  108. write_output("LATEST: %s", latest)
  109. def print_results(
  110. hits: List["TransformedHit"],
  111. name_column_width: Optional[int] = None,
  112. terminal_width: Optional[int] = None,
  113. ) -> None:
  114. if not hits:
  115. return
  116. if name_column_width is None:
  117. name_column_width = (
  118. max(
  119. [
  120. len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
  121. for hit in hits
  122. ]
  123. )
  124. + 4
  125. )
  126. for hit in hits:
  127. name = hit["name"]
  128. summary = hit["summary"] or ""
  129. latest = highest_version(hit.get("versions", ["-"]))
  130. if terminal_width is not None:
  131. target_width = terminal_width - name_column_width - 5
  132. if target_width > 10:
  133. # wrap and indent summary to fit terminal
  134. summary_lines = textwrap.wrap(summary, target_width)
  135. summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines)
  136. name_latest = f"{name} ({latest})"
  137. line = f"{name_latest:{name_column_width}} - {summary}"
  138. try:
  139. write_output(line)
  140. print_dist_installation_info(name, latest)
  141. except UnicodeEncodeError:
  142. pass
  143. def highest_version(versions: List[str]) -> str:
  144. return max(versions, key=parse_version)