_compat.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. import codecs
  2. import io
  3. import os
  4. import re
  5. import sys
  6. import typing as t
  7. from weakref import WeakKeyDictionary
  8. CYGWIN = sys.platform.startswith("cygwin")
  9. WIN = sys.platform.startswith("win")
  10. auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None
  11. _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
  12. def _make_text_stream(
  13. stream: t.BinaryIO,
  14. encoding: t.Optional[str],
  15. errors: t.Optional[str],
  16. force_readable: bool = False,
  17. force_writable: bool = False,
  18. ) -> t.TextIO:
  19. if encoding is None:
  20. encoding = get_best_encoding(stream)
  21. if errors is None:
  22. errors = "replace"
  23. return _NonClosingTextIOWrapper(
  24. stream,
  25. encoding,
  26. errors,
  27. line_buffering=True,
  28. force_readable=force_readable,
  29. force_writable=force_writable,
  30. )
  31. def is_ascii_encoding(encoding: str) -> bool:
  32. """Checks if a given encoding is ascii."""
  33. try:
  34. return codecs.lookup(encoding).name == "ascii"
  35. except LookupError:
  36. return False
  37. def get_best_encoding(stream: t.IO[t.Any]) -> str:
  38. """Returns the default stream encoding if not found."""
  39. rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
  40. if is_ascii_encoding(rv):
  41. return "utf-8"
  42. return rv
  43. class _NonClosingTextIOWrapper(io.TextIOWrapper):
  44. def __init__(
  45. self,
  46. stream: t.BinaryIO,
  47. encoding: t.Optional[str],
  48. errors: t.Optional[str],
  49. force_readable: bool = False,
  50. force_writable: bool = False,
  51. **extra: t.Any,
  52. ) -> None:
  53. self._stream = stream = t.cast(
  54. t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
  55. )
  56. super().__init__(stream, encoding, errors, **extra)
  57. def __del__(self) -> None:
  58. try:
  59. self.detach()
  60. except Exception:
  61. pass
  62. def isatty(self) -> bool:
  63. # https://bitbucket.org/pypy/pypy/issue/1803
  64. return self._stream.isatty()
  65. class _FixupStream:
  66. """The new io interface needs more from streams than streams
  67. traditionally implement. As such, this fix-up code is necessary in
  68. some circumstances.
  69. The forcing of readable and writable flags are there because some tools
  70. put badly patched objects on sys (one such offender are certain version
  71. of jupyter notebook).
  72. """
  73. def __init__(
  74. self,
  75. stream: t.BinaryIO,
  76. force_readable: bool = False,
  77. force_writable: bool = False,
  78. ):
  79. self._stream = stream
  80. self._force_readable = force_readable
  81. self._force_writable = force_writable
  82. def __getattr__(self, name: str) -> t.Any:
  83. return getattr(self._stream, name)
  84. def read1(self, size: int) -> bytes:
  85. f = getattr(self._stream, "read1", None)
  86. if f is not None:
  87. return t.cast(bytes, f(size))
  88. return self._stream.read(size)
  89. def readable(self) -> bool:
  90. if self._force_readable:
  91. return True
  92. x = getattr(self._stream, "readable", None)
  93. if x is not None:
  94. return t.cast(bool, x())
  95. try:
  96. self._stream.read(0)
  97. except Exception:
  98. return False
  99. return True
  100. def writable(self) -> bool:
  101. if self._force_writable:
  102. return True
  103. x = getattr(self._stream, "writable", None)
  104. if x is not None:
  105. return t.cast(bool, x())
  106. try:
  107. self._stream.write("") # type: ignore
  108. except Exception:
  109. try:
  110. self._stream.write(b"")
  111. except Exception:
  112. return False
  113. return True
  114. def seekable(self) -> bool:
  115. x = getattr(self._stream, "seekable", None)
  116. if x is not None:
  117. return t.cast(bool, x())
  118. try:
  119. self._stream.seek(self._stream.tell())
  120. except Exception:
  121. return False
  122. return True
  123. def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
  124. try:
  125. return isinstance(stream.read(0), bytes)
  126. except Exception:
  127. return default
  128. # This happens in some cases where the stream was already
  129. # closed. In this case, we assume the default.
  130. def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
  131. try:
  132. stream.write(b"")
  133. except Exception:
  134. try:
  135. stream.write("")
  136. return False
  137. except Exception:
  138. pass
  139. return default
  140. return True
  141. def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
  142. # We need to figure out if the given stream is already binary.
  143. # This can happen because the official docs recommend detaching
  144. # the streams to get binary streams. Some code might do this, so
  145. # we need to deal with this case explicitly.
  146. if _is_binary_reader(stream, False):
  147. return t.cast(t.BinaryIO, stream)
  148. buf = getattr(stream, "buffer", None)
  149. # Same situation here; this time we assume that the buffer is
  150. # actually binary in case it's closed.
  151. if buf is not None and _is_binary_reader(buf, True):
  152. return t.cast(t.BinaryIO, buf)
  153. return None
  154. def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
  155. # We need to figure out if the given stream is already binary.
  156. # This can happen because the official docs recommend detaching
  157. # the streams to get binary streams. Some code might do this, so
  158. # we need to deal with this case explicitly.
  159. if _is_binary_writer(stream, False):
  160. return t.cast(t.BinaryIO, stream)
  161. buf = getattr(stream, "buffer", None)
  162. # Same situation here; this time we assume that the buffer is
  163. # actually binary in case it's closed.
  164. if buf is not None and _is_binary_writer(buf, True):
  165. return t.cast(t.BinaryIO, buf)
  166. return None
  167. def _stream_is_misconfigured(stream: t.TextIO) -> bool:
  168. """A stream is misconfigured if its encoding is ASCII."""
  169. # If the stream does not have an encoding set, we assume it's set
  170. # to ASCII. This appears to happen in certain unittest
  171. # environments. It's not quite clear what the correct behavior is
  172. # but this at least will force Click to recover somehow.
  173. return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
  174. def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool:
  175. """A stream attribute is compatible if it is equal to the
  176. desired value or the desired value is unset and the attribute
  177. has a value.
  178. """
  179. stream_value = getattr(stream, attr, None)
  180. return stream_value == value or (value is None and stream_value is not None)
  181. def _is_compatible_text_stream(
  182. stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
  183. ) -> bool:
  184. """Check if a stream's encoding and errors attributes are
  185. compatible with the desired values.
  186. """
  187. return _is_compat_stream_attr(
  188. stream, "encoding", encoding
  189. ) and _is_compat_stream_attr(stream, "errors", errors)
  190. def _force_correct_text_stream(
  191. text_stream: t.IO[t.Any],
  192. encoding: t.Optional[str],
  193. errors: t.Optional[str],
  194. is_binary: t.Callable[[t.IO[t.Any], bool], bool],
  195. find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]],
  196. force_readable: bool = False,
  197. force_writable: bool = False,
  198. ) -> t.TextIO:
  199. if is_binary(text_stream, False):
  200. binary_reader = t.cast(t.BinaryIO, text_stream)
  201. else:
  202. text_stream = t.cast(t.TextIO, text_stream)
  203. # If the stream looks compatible, and won't default to a
  204. # misconfigured ascii encoding, return it as-is.
  205. if _is_compatible_text_stream(text_stream, encoding, errors) and not (
  206. encoding is None and _stream_is_misconfigured(text_stream)
  207. ):
  208. return text_stream
  209. # Otherwise, get the underlying binary reader.
  210. possible_binary_reader = find_binary(text_stream)
  211. # If that's not possible, silently use the original reader
  212. # and get mojibake instead of exceptions.
  213. if possible_binary_reader is None:
  214. return text_stream
  215. binary_reader = possible_binary_reader
  216. # Default errors to replace instead of strict in order to get
  217. # something that works.
  218. if errors is None:
  219. errors = "replace"
  220. # Wrap the binary stream in a text stream with the correct
  221. # encoding parameters.
  222. return _make_text_stream(
  223. binary_reader,
  224. encoding,
  225. errors,
  226. force_readable=force_readable,
  227. force_writable=force_writable,
  228. )
  229. def _force_correct_text_reader(
  230. text_reader: t.IO[t.Any],
  231. encoding: t.Optional[str],
  232. errors: t.Optional[str],
  233. force_readable: bool = False,
  234. ) -> t.TextIO:
  235. return _force_correct_text_stream(
  236. text_reader,
  237. encoding,
  238. errors,
  239. _is_binary_reader,
  240. _find_binary_reader,
  241. force_readable=force_readable,
  242. )
  243. def _force_correct_text_writer(
  244. text_writer: t.IO[t.Any],
  245. encoding: t.Optional[str],
  246. errors: t.Optional[str],
  247. force_writable: bool = False,
  248. ) -> t.TextIO:
  249. return _force_correct_text_stream(
  250. text_writer,
  251. encoding,
  252. errors,
  253. _is_binary_writer,
  254. _find_binary_writer,
  255. force_writable=force_writable,
  256. )
  257. def get_binary_stdin() -> t.BinaryIO:
  258. reader = _find_binary_reader(sys.stdin)
  259. if reader is None:
  260. raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
  261. return reader
  262. def get_binary_stdout() -> t.BinaryIO:
  263. writer = _find_binary_writer(sys.stdout)
  264. if writer is None:
  265. raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
  266. return writer
  267. def get_binary_stderr() -> t.BinaryIO:
  268. writer = _find_binary_writer(sys.stderr)
  269. if writer is None:
  270. raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
  271. return writer
  272. def get_text_stdin(
  273. encoding: t.Optional[str] = None, errors: t.Optional[str] = None
  274. ) -> t.TextIO:
  275. rv = _get_windows_console_stream(sys.stdin, encoding, errors)
  276. if rv is not None:
  277. return rv
  278. return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
  279. def get_text_stdout(
  280. encoding: t.Optional[str] = None, errors: t.Optional[str] = None
  281. ) -> t.TextIO:
  282. rv = _get_windows_console_stream(sys.stdout, encoding, errors)
  283. if rv is not None:
  284. return rv
  285. return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
  286. def get_text_stderr(
  287. encoding: t.Optional[str] = None, errors: t.Optional[str] = None
  288. ) -> t.TextIO:
  289. rv = _get_windows_console_stream(sys.stderr, encoding, errors)
  290. if rv is not None:
  291. return rv
  292. return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
  293. def _wrap_io_open(
  294. file: t.Union[str, "os.PathLike[str]", int],
  295. mode: str,
  296. encoding: t.Optional[str],
  297. errors: t.Optional[str],
  298. ) -> t.IO[t.Any]:
  299. """Handles not passing ``encoding`` and ``errors`` in binary mode."""
  300. if "b" in mode:
  301. return open(file, mode)
  302. return open(file, mode, encoding=encoding, errors=errors)
  303. def open_stream(
  304. filename: "t.Union[str, os.PathLike[str]]",
  305. mode: str = "r",
  306. encoding: t.Optional[str] = None,
  307. errors: t.Optional[str] = "strict",
  308. atomic: bool = False,
  309. ) -> t.Tuple[t.IO[t.Any], bool]:
  310. binary = "b" in mode
  311. filename = os.fspath(filename)
  312. # Standard streams first. These are simple because they ignore the
  313. # atomic flag. Use fsdecode to handle Path("-").
  314. if os.fsdecode(filename) == "-":
  315. if any(m in mode for m in ["w", "a", "x"]):
  316. if binary:
  317. return get_binary_stdout(), False
  318. return get_text_stdout(encoding=encoding, errors=errors), False
  319. if binary:
  320. return get_binary_stdin(), False
  321. return get_text_stdin(encoding=encoding, errors=errors), False
  322. # Non-atomic writes directly go out through the regular open functions.
  323. if not atomic:
  324. return _wrap_io_open(filename, mode, encoding, errors), True
  325. # Some usability stuff for atomic writes
  326. if "a" in mode:
  327. raise ValueError(
  328. "Appending to an existing file is not supported, because that"
  329. " would involve an expensive `copy`-operation to a temporary"
  330. " file. Open the file in normal `w`-mode and copy explicitly"
  331. " if that's what you're after."
  332. )
  333. if "x" in mode:
  334. raise ValueError("Use the `overwrite`-parameter instead.")
  335. if "w" not in mode:
  336. raise ValueError("Atomic writes only make sense with `w`-mode.")
  337. # Atomic writes are more complicated. They work by opening a file
  338. # as a proxy in the same folder and then using the fdopen
  339. # functionality to wrap it in a Python file. Then we wrap it in an
  340. # atomic file that moves the file over on close.
  341. import errno
  342. import random
  343. try:
  344. perm: t.Optional[int] = os.stat(filename).st_mode
  345. except OSError:
  346. perm = None
  347. flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
  348. if binary:
  349. flags |= getattr(os, "O_BINARY", 0)
  350. while True:
  351. tmp_filename = os.path.join(
  352. os.path.dirname(filename),
  353. f".__atomic-write{random.randrange(1 << 32):08x}",
  354. )
  355. try:
  356. fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
  357. break
  358. except OSError as e:
  359. if e.errno == errno.EEXIST or (
  360. os.name == "nt"
  361. and e.errno == errno.EACCES
  362. and os.path.isdir(e.filename)
  363. and os.access(e.filename, os.W_OK)
  364. ):
  365. continue
  366. raise
  367. if perm is not None:
  368. os.chmod(tmp_filename, perm) # in case perm includes bits in umask
  369. f = _wrap_io_open(fd, mode, encoding, errors)
  370. af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
  371. return t.cast(t.IO[t.Any], af), True
  372. class _AtomicFile:
  373. def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
  374. self._f = f
  375. self._tmp_filename = tmp_filename
  376. self._real_filename = real_filename
  377. self.closed = False
  378. @property
  379. def name(self) -> str:
  380. return self._real_filename
  381. def close(self, delete: bool = False) -> None:
  382. if self.closed:
  383. return
  384. self._f.close()
  385. os.replace(self._tmp_filename, self._real_filename)
  386. self.closed = True
  387. def __getattr__(self, name: str) -> t.Any:
  388. return getattr(self._f, name)
  389. def __enter__(self) -> "_AtomicFile":
  390. return self
  391. def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None:
  392. self.close(delete=exc_type is not None)
  393. def __repr__(self) -> str:
  394. return repr(self._f)
  395. def strip_ansi(value: str) -> str:
  396. return _ansi_re.sub("", value)
  397. def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
  398. while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
  399. stream = stream._stream
  400. return stream.__class__.__module__.startswith("ipykernel.")
  401. def should_strip_ansi(
  402. stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
  403. ) -> bool:
  404. if color is None:
  405. if stream is None:
  406. stream = sys.stdin
  407. return not isatty(stream) and not _is_jupyter_kernel_output(stream)
  408. return not color
  409. # On Windows, wrap the output streams with colorama to support ANSI
  410. # color codes.
  411. # NOTE: double check is needed so mypy does not analyze this on Linux
  412. if sys.platform.startswith("win") and WIN:
  413. from ._winconsole import _get_windows_console_stream
  414. def _get_argv_encoding() -> str:
  415. import locale
  416. return locale.getpreferredencoding()
  417. _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
  418. def auto_wrap_for_ansi( # noqa: F811
  419. stream: t.TextIO, color: t.Optional[bool] = None
  420. ) -> t.TextIO:
  421. """Support ANSI color and style codes on Windows by wrapping a
  422. stream with colorama.
  423. """
  424. try:
  425. cached = _ansi_stream_wrappers.get(stream)
  426. except Exception:
  427. cached = None
  428. if cached is not None:
  429. return cached
  430. import colorama
  431. strip = should_strip_ansi(stream, color)
  432. ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
  433. rv = t.cast(t.TextIO, ansi_wrapper.stream)
  434. _write = rv.write
  435. def _safe_write(s):
  436. try:
  437. return _write(s)
  438. except BaseException:
  439. ansi_wrapper.reset_all()
  440. raise
  441. rv.write = _safe_write
  442. try:
  443. _ansi_stream_wrappers[stream] = rv
  444. except Exception:
  445. pass
  446. return rv
  447. else:
  448. def _get_argv_encoding() -> str:
  449. return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
  450. def _get_windows_console_stream(
  451. f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
  452. ) -> t.Optional[t.TextIO]:
  453. return None
  454. def term_len(x: str) -> int:
  455. return len(strip_ansi(x))
  456. def isatty(stream: t.IO[t.Any]) -> bool:
  457. try:
  458. return stream.isatty()
  459. except Exception:
  460. return False
  461. def _make_cached_stream_func(
  462. src_func: t.Callable[[], t.Optional[t.TextIO]],
  463. wrapper_func: t.Callable[[], t.TextIO],
  464. ) -> t.Callable[[], t.Optional[t.TextIO]]:
  465. cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
  466. def func() -> t.Optional[t.TextIO]:
  467. stream = src_func()
  468. if stream is None:
  469. return None
  470. try:
  471. rv = cache.get(stream)
  472. except Exception:
  473. rv = None
  474. if rv is not None:
  475. return rv
  476. rv = wrapper_func()
  477. try:
  478. cache[stream] = rv
  479. except Exception:
  480. pass
  481. return rv
  482. return func
  483. _default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
  484. _default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
  485. _default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
  486. binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = {
  487. "stdin": get_binary_stdin,
  488. "stdout": get_binary_stdout,
  489. "stderr": get_binary_stderr,
  490. }
  491. text_streams: t.Mapping[
  492. str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO]
  493. ] = {
  494. "stdin": get_text_stdin,
  495. "stdout": get_text_stdout,
  496. "stderr": get_text_stderr,
  497. }