spinners.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import contextlib
  2. import itertools
  3. import logging
  4. import sys
  5. import time
  6. from typing import IO, Generator, Optional
  7. from pip._internal.utils.compat import WINDOWS
  8. from pip._internal.utils.logging import get_indentation
  9. logger = logging.getLogger(__name__)
  10. class SpinnerInterface:
  11. def spin(self) -> None:
  12. raise NotImplementedError()
  13. def finish(self, final_status: str) -> None:
  14. raise NotImplementedError()
  15. class InteractiveSpinner(SpinnerInterface):
  16. def __init__(
  17. self,
  18. message: str,
  19. file: Optional[IO[str]] = None,
  20. spin_chars: str = "-\\|/",
  21. # Empirically, 8 updates/second looks nice
  22. min_update_interval_seconds: float = 0.125,
  23. ):
  24. self._message = message
  25. if file is None:
  26. file = sys.stdout
  27. self._file = file
  28. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  29. self._finished = False
  30. self._spin_cycle = itertools.cycle(spin_chars)
  31. self._file.write(" " * get_indentation() + self._message + " ... ")
  32. self._width = 0
  33. def _write(self, status: str) -> None:
  34. assert not self._finished
  35. # Erase what we wrote before by backspacing to the beginning, writing
  36. # spaces to overwrite the old text, and then backspacing again
  37. backup = "\b" * self._width
  38. self._file.write(backup + " " * self._width + backup)
  39. # Now we have a blank slate to add our status
  40. self._file.write(status)
  41. self._width = len(status)
  42. self._file.flush()
  43. self._rate_limiter.reset()
  44. def spin(self) -> None:
  45. if self._finished:
  46. return
  47. if not self._rate_limiter.ready():
  48. return
  49. self._write(next(self._spin_cycle))
  50. def finish(self, final_status: str) -> None:
  51. if self._finished:
  52. return
  53. self._write(final_status)
  54. self._file.write("\n")
  55. self._file.flush()
  56. self._finished = True
  57. # Used for dumb terminals, non-interactive installs (no tty), etc.
  58. # We still print updates occasionally (once every 60 seconds by default) to
  59. # act as a keep-alive for systems like Travis-CI that take lack-of-output as
  60. # an indication that a task has frozen.
  61. class NonInteractiveSpinner(SpinnerInterface):
  62. def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
  63. self._message = message
  64. self._finished = False
  65. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  66. self._update("started")
  67. def _update(self, status: str) -> None:
  68. assert not self._finished
  69. self._rate_limiter.reset()
  70. logger.info("%s: %s", self._message, status)
  71. def spin(self) -> None:
  72. if self._finished:
  73. return
  74. if not self._rate_limiter.ready():
  75. return
  76. self._update("still running...")
  77. def finish(self, final_status: str) -> None:
  78. if self._finished:
  79. return
  80. self._update(f"finished with status '{final_status}'")
  81. self._finished = True
  82. class RateLimiter:
  83. def __init__(self, min_update_interval_seconds: float) -> None:
  84. self._min_update_interval_seconds = min_update_interval_seconds
  85. self._last_update: float = 0
  86. def ready(self) -> bool:
  87. now = time.time()
  88. delta = now - self._last_update
  89. return delta >= self._min_update_interval_seconds
  90. def reset(self) -> None:
  91. self._last_update = time.time()
  92. @contextlib.contextmanager
  93. def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
  94. # Interactive spinner goes directly to sys.stdout rather than being routed
  95. # through the logging system, but it acts like it has level INFO,
  96. # i.e. it's only displayed if we're at level INFO or better.
  97. # Non-interactive spinner goes through the logging system, so it is always
  98. # in sync with logging configuration.
  99. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
  100. spinner: SpinnerInterface = InteractiveSpinner(message)
  101. else:
  102. spinner = NonInteractiveSpinner(message)
  103. try:
  104. with hidden_cursor(sys.stdout):
  105. yield spinner
  106. except KeyboardInterrupt:
  107. spinner.finish("canceled")
  108. raise
  109. except Exception:
  110. spinner.finish("error")
  111. raise
  112. else:
  113. spinner.finish("done")
  114. HIDE_CURSOR = "\x1b[?25l"
  115. SHOW_CURSOR = "\x1b[?25h"
  116. @contextlib.contextmanager
  117. def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
  118. # The Windows terminal does not support the hide/show cursor ANSI codes,
  119. # even via colorama. So don't even try.
  120. if WINDOWS:
  121. yield
  122. # We don't want to clutter the output with control characters if we're
  123. # writing to a file, or if the user is running with --quiet.
  124. # See https://github.com/pypa/pip/issues/3418
  125. elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
  126. yield
  127. else:
  128. file.write(HIDE_CURSOR)
  129. try:
  130. yield
  131. finally:
  132. file.write(SHOW_CURSOR)