temp_dir.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import errno
  2. import itertools
  3. import logging
  4. import os.path
  5. import tempfile
  6. from contextlib import ExitStack, contextmanager
  7. from typing import Any, Dict, Generator, Optional, TypeVar, Union
  8. from pip._internal.utils.misc import enum, rmtree
  9. logger = logging.getLogger(__name__)
  10. _T = TypeVar("_T", bound="TempDirectory")
  11. # Kinds of temporary directories. Only needed for ones that are
  12. # globally-managed.
  13. tempdir_kinds = enum(
  14. BUILD_ENV="build-env",
  15. EPHEM_WHEEL_CACHE="ephem-wheel-cache",
  16. REQ_BUILD="req-build",
  17. )
  18. _tempdir_manager: Optional[ExitStack] = None
  19. @contextmanager
  20. def global_tempdir_manager() -> Generator[None, None, None]:
  21. global _tempdir_manager
  22. with ExitStack() as stack:
  23. old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
  24. try:
  25. yield
  26. finally:
  27. _tempdir_manager = old_tempdir_manager
  28. class TempDirectoryTypeRegistry:
  29. """Manages temp directory behavior"""
  30. def __init__(self) -> None:
  31. self._should_delete: Dict[str, bool] = {}
  32. def set_delete(self, kind: str, value: bool) -> None:
  33. """Indicate whether a TempDirectory of the given kind should be
  34. auto-deleted.
  35. """
  36. self._should_delete[kind] = value
  37. def get_delete(self, kind: str) -> bool:
  38. """Get configured auto-delete flag for a given TempDirectory type,
  39. default True.
  40. """
  41. return self._should_delete.get(kind, True)
  42. _tempdir_registry: Optional[TempDirectoryTypeRegistry] = None
  43. @contextmanager
  44. def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]:
  45. """Provides a scoped global tempdir registry that can be used to dictate
  46. whether directories should be deleted.
  47. """
  48. global _tempdir_registry
  49. old_tempdir_registry = _tempdir_registry
  50. _tempdir_registry = TempDirectoryTypeRegistry()
  51. try:
  52. yield _tempdir_registry
  53. finally:
  54. _tempdir_registry = old_tempdir_registry
  55. class _Default:
  56. pass
  57. _default = _Default()
  58. class TempDirectory:
  59. """Helper class that owns and cleans up a temporary directory.
  60. This class can be used as a context manager or as an OO representation of a
  61. temporary directory.
  62. Attributes:
  63. path
  64. Location to the created temporary directory
  65. delete
  66. Whether the directory should be deleted when exiting
  67. (when used as a contextmanager)
  68. Methods:
  69. cleanup()
  70. Deletes the temporary directory
  71. When used as a context manager, if the delete attribute is True, on
  72. exiting the context the temporary directory is deleted.
  73. """
  74. def __init__(
  75. self,
  76. path: Optional[str] = None,
  77. delete: Union[bool, None, _Default] = _default,
  78. kind: str = "temp",
  79. globally_managed: bool = False,
  80. ):
  81. super().__init__()
  82. if delete is _default:
  83. if path is not None:
  84. # If we were given an explicit directory, resolve delete option
  85. # now.
  86. delete = False
  87. else:
  88. # Otherwise, we wait until cleanup and see what
  89. # tempdir_registry says.
  90. delete = None
  91. # The only time we specify path is in for editables where it
  92. # is the value of the --src option.
  93. if path is None:
  94. path = self._create(kind)
  95. self._path = path
  96. self._deleted = False
  97. self.delete = delete
  98. self.kind = kind
  99. if globally_managed:
  100. assert _tempdir_manager is not None
  101. _tempdir_manager.enter_context(self)
  102. @property
  103. def path(self) -> str:
  104. assert not self._deleted, f"Attempted to access deleted path: {self._path}"
  105. return self._path
  106. def __repr__(self) -> str:
  107. return f"<{self.__class__.__name__} {self.path!r}>"
  108. def __enter__(self: _T) -> _T:
  109. return self
  110. def __exit__(self, exc: Any, value: Any, tb: Any) -> None:
  111. if self.delete is not None:
  112. delete = self.delete
  113. elif _tempdir_registry:
  114. delete = _tempdir_registry.get_delete(self.kind)
  115. else:
  116. delete = True
  117. if delete:
  118. self.cleanup()
  119. def _create(self, kind: str) -> str:
  120. """Create a temporary directory and store its path in self.path"""
  121. # We realpath here because some systems have their default tmpdir
  122. # symlinked to another directory. This tends to confuse build
  123. # scripts, so we canonicalize the path by traversing potential
  124. # symlinks here.
  125. path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
  126. logger.debug("Created temporary directory: %s", path)
  127. return path
  128. def cleanup(self) -> None:
  129. """Remove the temporary directory created and reset state"""
  130. self._deleted = True
  131. if not os.path.exists(self._path):
  132. return
  133. rmtree(self._path)
  134. class AdjacentTempDirectory(TempDirectory):
  135. """Helper class that creates a temporary directory adjacent to a real one.
  136. Attributes:
  137. original
  138. The original directory to create a temp directory for.
  139. path
  140. After calling create() or entering, contains the full
  141. path to the temporary directory.
  142. delete
  143. Whether the directory should be deleted when exiting
  144. (when used as a contextmanager)
  145. """
  146. # The characters that may be used to name the temp directory
  147. # We always prepend a ~ and then rotate through these until
  148. # a usable name is found.
  149. # pkg_resources raises a different error for .dist-info folder
  150. # with leading '-' and invalid metadata
  151. LEADING_CHARS = "-~.=%0123456789"
  152. def __init__(self, original: str, delete: Optional[bool] = None) -> None:
  153. self.original = original.rstrip("/\\")
  154. super().__init__(delete=delete)
  155. @classmethod
  156. def _generate_names(cls, name: str) -> Generator[str, None, None]:
  157. """Generates a series of temporary names.
  158. The algorithm replaces the leading characters in the name
  159. with ones that are valid filesystem characters, but are not
  160. valid package names (for both Python and pip definitions of
  161. package).
  162. """
  163. for i in range(1, len(name)):
  164. for candidate in itertools.combinations_with_replacement(
  165. cls.LEADING_CHARS, i - 1
  166. ):
  167. new_name = "~" + "".join(candidate) + name[i:]
  168. if new_name != name:
  169. yield new_name
  170. # If we make it this far, we will have to make a longer name
  171. for i in range(len(cls.LEADING_CHARS)):
  172. for candidate in itertools.combinations_with_replacement(
  173. cls.LEADING_CHARS, i
  174. ):
  175. new_name = "~" + "".join(candidate) + name
  176. if new_name != name:
  177. yield new_name
  178. def _create(self, kind: str) -> str:
  179. root, name = os.path.split(self.original)
  180. for candidate in self._generate_names(name):
  181. path = os.path.join(root, candidate)
  182. try:
  183. os.mkdir(path)
  184. except OSError as ex:
  185. # Continue if the name exists already
  186. if ex.errno != errno.EEXIST:
  187. raise
  188. else:
  189. path = os.path.realpath(path)
  190. break
  191. else:
  192. # Final fallback on the default behavior.
  193. path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
  194. logger.debug("Created temporary directory: %s", path)
  195. return path