search.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import logging
  2. import shutil
  3. import sys
  4. import textwrap
  5. import xmlrpc.client
  6. from collections import OrderedDict
  7. from optparse import Values
  8. from typing import TYPE_CHECKING, Dict, List, Optional
  9. from pip._vendor.packaging.version import parse as parse_version
  10. from pip._internal.cli.base_command import Command
  11. from pip._internal.cli.req_command import SessionCommandMixin
  12. from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
  13. from pip._internal.exceptions import CommandError
  14. from pip._internal.metadata import get_default_environment
  15. from pip._internal.models.index import PyPI
  16. from pip._internal.network.xmlrpc import PipXmlrpcTransport
  17. from pip._internal.utils.logging import indent_log
  18. from pip._internal.utils.misc import write_output
  19. if TYPE_CHECKING:
  20. from typing import TypedDict
  21. class TransformedHit(TypedDict):
  22. name: str
  23. summary: str
  24. versions: List[str]
  25. logger = logging.getLogger(__name__)
  26. class SearchCommand(Command, SessionCommandMixin):
  27. """Search for PyPI packages whose name or summary contains <query>."""
  28. usage = """
  29. %prog [options] <query>"""
  30. ignore_require_venv = True
  31. def add_options(self) -> None:
  32. self.cmd_opts.add_option(
  33. '-i', '--index',
  34. dest='index',
  35. metavar='URL',
  36. default=PyPI.pypi_url,
  37. help='Base URL of Python Package Index (default %default)')
  38. self.parser.insert_option_group(0, self.cmd_opts)
  39. def run(self, options: Values, args: List[str]) -> int:
  40. if not args:
  41. raise CommandError('Missing required argument (search query).')
  42. query = args
  43. pypi_hits = self.search(query, options)
  44. hits = transform_hits(pypi_hits)
  45. terminal_width = None
  46. if sys.stdout.isatty():
  47. terminal_width = shutil.get_terminal_size()[0]
  48. print_results(hits, terminal_width=terminal_width)
  49. if pypi_hits:
  50. return SUCCESS
  51. return NO_MATCHES_FOUND
  52. def search(self, query: List[str], options: Values) -> List[Dict[str, str]]:
  53. index_url = options.index
  54. session = self.get_default_session(options)
  55. transport = PipXmlrpcTransport(index_url, session)
  56. pypi = xmlrpc.client.ServerProxy(index_url, transport)
  57. try:
  58. hits = pypi.search({'name': query, 'summary': query}, 'or')
  59. except xmlrpc.client.Fault as fault:
  60. message = "XMLRPC request failed [code: {code}]\n{string}".format(
  61. code=fault.faultCode,
  62. string=fault.faultString,
  63. )
  64. raise CommandError(message)
  65. assert isinstance(hits, list)
  66. return hits
  67. def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]:
  68. """
  69. The list from pypi is really a list of versions. We want a list of
  70. packages with the list of versions stored inline. This converts the
  71. list from pypi into one we can use.
  72. """
  73. packages: Dict[str, "TransformedHit"] = OrderedDict()
  74. for hit in hits:
  75. name = hit['name']
  76. summary = hit['summary']
  77. version = hit['version']
  78. if name not in packages.keys():
  79. packages[name] = {
  80. 'name': name,
  81. 'summary': summary,
  82. 'versions': [version],
  83. }
  84. else:
  85. packages[name]['versions'].append(version)
  86. # if this is the highest version, replace summary and score
  87. if version == highest_version(packages[name]['versions']):
  88. packages[name]['summary'] = summary
  89. return list(packages.values())
  90. def print_dist_installation_info(name: str, latest: str) -> None:
  91. env = get_default_environment()
  92. dist = env.get_distribution(name)
  93. if dist is not None:
  94. with indent_log():
  95. if dist.version == latest:
  96. write_output('INSTALLED: %s (latest)', dist.version)
  97. else:
  98. write_output('INSTALLED: %s', dist.version)
  99. if parse_version(latest).pre:
  100. write_output('LATEST: %s (pre-release; install'
  101. ' with "pip install --pre")', latest)
  102. else:
  103. write_output('LATEST: %s', latest)
  104. def print_results(
  105. hits: List["TransformedHit"],
  106. name_column_width: Optional[int] = None,
  107. terminal_width: Optional[int] = None,
  108. ) -> None:
  109. if not hits:
  110. return
  111. if name_column_width is None:
  112. name_column_width = max([
  113. len(hit['name']) + len(highest_version(hit.get('versions', ['-'])))
  114. for hit in hits
  115. ]) + 4
  116. for hit in hits:
  117. name = hit['name']
  118. summary = hit['summary'] or ''
  119. latest = highest_version(hit.get('versions', ['-']))
  120. if terminal_width is not None:
  121. target_width = terminal_width - name_column_width - 5
  122. if target_width > 10:
  123. # wrap and indent summary to fit terminal
  124. summary_lines = textwrap.wrap(summary, target_width)
  125. summary = ('\n' + ' ' * (name_column_width + 3)).join(
  126. summary_lines)
  127. name_latest = f'{name} ({latest})'
  128. line = f'{name_latest:{name_column_width}} - {summary}'
  129. try:
  130. write_output(line)
  131. print_dist_installation_info(name, latest)
  132. except UnicodeEncodeError:
  133. pass
  134. def highest_version(versions: List[str]) -> str:
  135. return max(versions, key=parse_version)